do_memory_storage_turso/cache/
query_cache.rs1use parking_lot::RwLock;
12use std::collections::{HashMap, HashSet, VecDeque};
13use std::sync::Arc;
14use std::time::Instant;
15use tokio::sync::mpsc;
16use tracing::{debug, info, trace};
17
18#[path = "query_cache_types.rs"]
19mod types;
20pub use types::{
21 AdvancedCacheStats, AdvancedQueryCacheConfig, CachedResult, InvalidationMessage, QueryKey,
22 QueryType, TableDependency,
23};
24
25pub struct AdvancedQueryCache {
27 results: Arc<RwLock<HashMap<QueryKey, CachedResult>>>,
29 dependency_index: Arc<RwLock<HashMap<TableDependency, HashSet<QueryKey>>>>,
31 lru_queue: Arc<RwLock<VecDeque<QueryKey>>>,
33 config: AdvancedQueryCacheConfig,
35 stats: Arc<RwLock<AdvancedCacheStats>>,
37 invalidation_tx: mpsc::UnboundedSender<InvalidationMessage>,
39 hot_queries: Arc<RwLock<HashSet<QueryKey>>>,
41}
42
43impl AdvancedQueryCache {
44 pub fn new(
46 config: AdvancedQueryCacheConfig,
47 ) -> (Self, mpsc::UnboundedReceiver<InvalidationMessage>) {
48 let (invalidation_tx, invalidation_rx) = mpsc::unbounded_channel();
49
50 let cache = Self {
51 results: Arc::new(RwLock::new(HashMap::new())),
52 dependency_index: Arc::new(RwLock::new(HashMap::new())),
53 lru_queue: Arc::new(RwLock::new(VecDeque::new())),
54 config,
55 stats: Arc::new(RwLock::new(AdvancedCacheStats::default())),
56 invalidation_tx,
57 hot_queries: Arc::new(RwLock::new(HashSet::new())),
58 };
59
60 (cache, invalidation_rx)
61 }
62
63 pub fn new_with_receiver() -> (Self, mpsc::UnboundedReceiver<InvalidationMessage>) {
65 Self::new(AdvancedQueryCacheConfig::default())
66 }
67
68 pub fn get(&self, key: &QueryKey) -> Option<Vec<u8>> {
70 let results = self.results.read();
71
72 if let Some(result) = results.get(key) {
73 if result.is_expired() {
74 drop(results);
75 self.handle_expired(key);
76 return None;
77 }
78
79 result.record_access();
80
81 if result.access_count() >= self.config.hot_threshold {
83 let mut hot = self.hot_queries.write();
84 hot.insert(key.clone());
85 }
86
87 self.update_lru(key.clone());
89
90 self.stats.write().hits += 1;
92
93 trace!("Cache hit for query key: {:?}", key);
94 Some(result.data.clone())
95 } else {
96 self.stats.write().misses += 1;
97 trace!("Cache miss for query key: {:?}", key);
98 None
99 }
100 }
101
102 pub fn put(&self, key: QueryKey, data: Vec<u8>, dependencies: Vec<TableDependency>) {
104 let ttl = self.config.ttl_for_type(key.query_type);
105
106 self.evict_if_needed();
108
109 if self.config.enable_dependency_tracking {
111 let mut index = self.dependency_index.write();
112 for dep in &dependencies {
113 index.entry(dep.clone()).or_default().insert(key.clone());
114 }
115 }
116
117 let result = CachedResult::new(data, ttl, dependencies, key.query_type);
119
120 let mut results = self.results.write();
121 results.insert(key.clone(), result);
122 drop(results);
123
124 self.lru_queue.write().push_back(key);
126
127 self.stats.write().current_size = self.results.read().len();
129
130 debug!("Cached query result with TTL: {:?}", ttl);
131 }
132
133 pub fn invalidate_by_table(&self, table: &TableDependency) {
135 if !self.config.enable_dependency_tracking {
136 return;
137 }
138
139 let keys_to_invalidate: Vec<QueryKey> = {
140 let index = self.dependency_index.read();
141 index
142 .get(table)
143 .map(|keys| keys.iter().cloned().collect())
144 .unwrap_or_default()
145 };
146
147 let mut invalidated = 0;
148 for key in keys_to_invalidate {
149 self.remove_entry(&key);
150 invalidated += 1;
151 }
152
153 if invalidated > 0 {
154 self.stats.write().invalidations += invalidated;
155 info!(
156 "Invalidated {} cache entries for table: {:?}",
157 invalidated, table
158 );
159 }
160 }
161
162 pub fn invalidate_key(&self, key: &QueryKey) {
164 self.remove_entry(key);
165 }
166
167 pub fn clear(&self) {
169 self.results.write().clear();
170 self.dependency_index.write().clear();
171 self.lru_queue.write().clear();
172 self.hot_queries.write().clear();
173 self.stats.write().current_size = 0;
174
175 info!("Cleared all query cache entries");
176 }
177
178 pub fn stats(&self) -> AdvancedCacheStats {
180 self.stats.read().clone()
181 }
182
183 pub fn get_hot_queries_needing_refresh(&self) -> Vec<QueryKey> {
185 let results = self.results.read();
186 let hot = self.hot_queries.read();
187
188 hot.iter()
189 .filter(|key| {
190 results.get(key).is_some_and(|r| {
191 r.should_refresh(self.config.hot_threshold, self.config.refresh_interval)
192 })
193 })
194 .cloned()
195 .collect()
196 }
197
198 pub fn mark_refreshed(&self, key: &QueryKey) {
200 let mut results = self.results.write();
201 if let Some(result) = results.get_mut(key) {
202 result.created_at = Instant::now();
204 self.stats.write().refreshes += 1;
205 }
206 }
207
208 #[cfg(test)]
209 pub(crate) fn force_set_created_at(&self, key: &QueryKey, created_at: Instant) {
210 if let Some(result) = self.results.write().get_mut(key) {
211 result.created_at = created_at;
212 *result.last_accessed.write() = created_at;
213 }
214 }
215
216 pub fn invalidation_sender(&self) -> mpsc::UnboundedSender<InvalidationMessage> {
218 self.invalidation_tx.clone()
219 }
220
221 fn handle_expired(&self, key: &QueryKey) {
223 self.remove_entry(key);
224 self.stats.write().expirations += 1;
225 trace!("Removed expired cache entry: {:?}", key);
226 }
227
228 fn remove_entry(&self, key: &QueryKey) {
230 let result = self.results.write().remove(key);
231
232 if let Some(result) = result {
233 self.cleanup_dependency_index(key, &result.dependencies);
234 }
235
236 self.lru_queue.write().retain(|k| k != key);
238
239 self.hot_queries.write().remove(key);
241
242 self.stats.write().current_size = self.results.read().len();
244 }
245
246 fn cleanup_dependency_index(&self, key: &QueryKey, dependencies: &[TableDependency]) {
248 if !self.config.enable_dependency_tracking {
249 return;
250 }
251 let mut index = self.dependency_index.write();
252 for dep in dependencies {
253 if let Some(keys) = index.get_mut(dep) {
254 keys.remove(key);
255 if keys.is_empty() {
256 index.remove(dep);
257 }
258 }
259 }
260 }
261
262 fn evict_if_needed(&self) {
264 let current_size = self.results.read().len();
265
266 if current_size >= self.config.max_queries {
267 let keys_to_evict: Vec<QueryKey> = {
268 let lru = self.lru_queue.read();
269 lru.iter()
270 .take(current_size - self.config.max_queries + 1)
271 .cloned()
272 .collect()
273 };
274
275 for key in keys_to_evict {
276 self.remove_entry(&key);
277 self.stats.write().evictions += 1;
278 debug!("Evicted LRU cache entry: {:?}", key);
279 }
280 }
281 }
282
283 fn update_lru(&self, key: QueryKey) {
285 let mut lru = self.lru_queue.write();
286 lru.retain(|k| k != &key);
287 lru.push_back(key);
288 }
289
290 pub fn clear_expired(&self) -> usize {
292 let expired_keys: Vec<QueryKey> = {
293 let results = self.results.read();
294 results
295 .iter()
296 .filter(|(_, result)| result.is_expired())
297 .map(|(key, _)| key.clone())
298 .collect()
299 };
300
301 let count = expired_keys.len();
302 for key in expired_keys {
303 self.remove_entry(&key);
304 }
305
306 if count > 0 {
307 self.stats.write().expirations += count as u64;
308 debug!("Cleared {} expired cache entries", count);
309 }
310
311 count
312 }
313
314 pub fn len(&self) -> usize {
316 self.results.read().len()
317 }
318
319 pub fn is_empty(&self) -> bool {
321 self.len() == 0
322 }
323}
324
325impl Clone for AdvancedQueryCache {
326 fn clone(&self) -> Self {
327 Self {
328 results: Arc::clone(&self.results),
329 dependency_index: Arc::clone(&self.dependency_index),
330 lru_queue: Arc::clone(&self.lru_queue),
331 config: self.config.clone(),
332 stats: Arc::clone(&self.stats),
333 invalidation_tx: self.invalidation_tx.clone(),
334 hot_queries: Arc::clone(&self.hot_queries),
335 }
336 }
337}
338
339#[cfg(test)]
340#[path = "query_cache_tests.rs"]
341mod tests;