1use std::collections::HashMap;
12use std::time::Instant;
13
14use parking_lot::RwLock;
15
16use super::config::L1Config;
17use super::result::{CachedResult, L1Entry};
18
19#[derive(Debug)]
24pub struct L1HotCache {
25 config: L1Config,
27
28 entries: RwLock<HashMap<String, L1Entry>>,
30
31 lru_order: RwLock<Vec<(String, Instant)>>,
33}
34
35impl L1HotCache {
36 pub fn new(config: L1Config) -> Self {
38 let size = config.size;
39 Self {
40 config,
41 entries: RwLock::new(HashMap::with_capacity(size)),
42 lru_order: RwLock::new(Vec::with_capacity(size)),
43 }
44 }
45
46 pub fn get(&self, query: &str) -> Option<CachedResult> {
53 if !self.config.enabled {
54 return None;
55 }
56
57 let (result, expired) = {
59 let entries = self.entries.read();
60 match entries.get(query) {
61 None => return None,
62 Some(entry) if entry.is_expired() => (None, true),
63 Some(entry) => {
64 entry.touch();
65 (Some(entry.result.clone()), false)
66 }
67 }
68 };
69
70 if expired {
71 let mut entries = self.entries.write();
73 entries.remove(query);
74 drop(entries);
75 self.remove_from_lru(query);
76 return None;
77 }
78
79 self.update_lru(query);
82 result
83 }
84
85 pub fn put(&self, query: String, result: CachedResult) {
87 if !self.config.enabled {
88 return;
89 }
90
91 let mut entries = self.entries.write();
92
93 if entries.len() >= self.config.size && !entries.contains_key(&query) {
95 self.evict_lru(&mut entries);
96 }
97
98 let mut adjusted_result = result;
100 if adjusted_result.ttl > self.config.ttl {
101 adjusted_result.ttl = self.config.ttl;
102 }
103
104 let entry = L1Entry::new(query.clone(), adjusted_result);
106 entries.insert(query.clone(), entry);
107 drop(entries);
108 self.update_lru(&query);
109 }
110
111 pub fn remove(&self, query: &str) {
113 self.entries.write().remove(query);
114 self.remove_from_lru(query);
115 }
116
117 pub fn clear(&self) {
119 self.entries.write().clear();
120 self.lru_order.write().clear();
121 }
122
123 pub fn len(&self) -> usize {
125 self.entries.read().len()
126 }
127
128 pub fn is_empty(&self) -> bool {
130 self.len() == 0
131 }
132
133 pub fn capacity(&self) -> usize {
135 self.config.size
136 }
137
138 pub fn stats(&self) -> L1CacheStats {
140 let entries = self.entries.read();
141 let total_size: usize = entries.values().map(|e| e.result.size()).sum();
142 let total_access: u64 = entries.values().map(|e| e.access_count()).sum();
143
144 L1CacheStats {
145 entry_count: entries.len(),
146 capacity: self.config.size,
147 total_size_bytes: total_size,
148 total_accesses: total_access,
149 }
150 }
151
152 pub fn evict_expired(&self) {
154 let mut entries = self.entries.write();
155 let expired: Vec<String> = entries
156 .iter()
157 .filter(|(_, entry)| entry.is_expired())
158 .map(|(key, _)| key.clone())
159 .collect();
160
161 for key in &expired {
162 entries.remove(key);
163 }
164 drop(entries);
165
166 for key in &expired {
167 self.remove_from_lru(key);
168 }
169 }
170
171 fn update_lru(&self, query: &str) {
173 let mut lru = self.lru_order.write();
174 lru.retain(|(q, _)| q != query);
175 lru.push((query.to_string(), Instant::now()));
176 }
177
178 fn remove_from_lru(&self, query: &str) {
180 self.lru_order.write().retain(|(q, _)| q != query);
181 }
182
183 fn evict_lru(&self, entries: &mut HashMap<String, L1Entry>) {
185 let mut lru = self.lru_order.write();
186
187 let expired: Vec<String> = lru
189 .iter()
190 .filter(|(q, _)| {
191 entries
192 .get(q)
193 .map(|e| e.is_expired())
194 .unwrap_or(true)
195 })
196 .map(|(q, _)| q.clone())
197 .collect();
198
199 for key in expired {
200 entries.remove(&key);
201 lru.retain(|(q, _)| q != &key);
202 }
203
204 if entries.len() >= self.config.size {
206 if let Some((key, _)) = lru.first().cloned() {
207 entries.remove(&key);
208 lru.remove(0);
209 }
210 }
211 }
212}
213
214#[derive(Debug, Clone)]
216pub struct L1CacheStats {
217 pub entry_count: usize,
219
220 pub capacity: usize,
222
223 pub total_size_bytes: usize,
225
226 pub total_accesses: u64,
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use bytes::Bytes;
234 use std::time::Duration;
235
236 fn create_result(data: &str) -> CachedResult {
237 CachedResult::new(
238 Bytes::from(data.to_string()),
239 1,
240 Duration::from_secs(60),
241 vec!["test".to_string()],
242 Duration::from_millis(5),
243 )
244 }
245
246 #[test]
247 fn test_basic_get_put() {
248 let config = L1Config {
249 enabled: true,
250 size: 100,
251 ttl: Duration::from_secs(60),
252 };
253 let cache = L1HotCache::new(config);
254
255 let query = "SELECT * FROM users WHERE id = 1";
256 let result = create_result("user data");
257
258 assert!(cache.get(query).is_none());
260
261 cache.put(query.to_string(), result.clone());
263 let cached = cache.get(query);
264 assert!(cached.is_some());
265 assert_eq!(cached.unwrap().data, result.data);
266 }
267
268 #[test]
269 fn test_exact_match() {
270 let config = L1Config {
271 enabled: true,
272 size: 100,
273 ttl: Duration::from_secs(60),
274 };
275 let cache = L1HotCache::new(config);
276
277 let query1 = "SELECT * FROM users WHERE id = 1";
278 let query2 = "SELECT * FROM users WHERE id = 2";
279 let result = create_result("user data");
280
281 cache.put(query1.to_string(), result);
282
283 assert!(cache.get(query1).is_some());
285
286 assert!(cache.get(query2).is_none());
288 }
289
290 #[test]
291 fn test_expiration() {
292 let config = L1Config {
293 enabled: true,
294 size: 100,
295 ttl: Duration::from_millis(10),
296 };
297 let cache = L1HotCache::new(config);
298
299 let query = "SELECT 1";
300 let result = create_result("1");
301
302 cache.put(query.to_string(), result);
303 assert!(cache.get(query).is_some());
304
305 std::thread::sleep(Duration::from_millis(15));
307 assert!(cache.get(query).is_none());
308 }
309
310 #[test]
311 fn test_lru_eviction() {
312 let config = L1Config {
313 enabled: true,
314 size: 3,
315 ttl: Duration::from_secs(60),
316 };
317 let cache = L1HotCache::new(config);
318
319 cache.put("query1".to_string(), create_result("1"));
321 cache.put("query2".to_string(), create_result("2"));
322 cache.put("query3".to_string(), create_result("3"));
323
324 cache.get("query1");
326
327 cache.put("query4".to_string(), create_result("4"));
329
330 assert!(cache.get("query1").is_some()); assert!(cache.get("query2").is_none()); assert!(cache.get("query3").is_some()); assert!(cache.get("query4").is_some()); }
335
336 #[test]
337 fn test_clear() {
338 let config = L1Config {
339 enabled: true,
340 size: 100,
341 ttl: Duration::from_secs(60),
342 };
343 let cache = L1HotCache::new(config);
344
345 cache.put("query1".to_string(), create_result("1"));
346 cache.put("query2".to_string(), create_result("2"));
347
348 assert_eq!(cache.len(), 2);
349
350 cache.clear();
351
352 assert_eq!(cache.len(), 0);
353 assert!(cache.is_empty());
354 }
355
356 #[test]
357 fn test_remove() {
358 let config = L1Config {
359 enabled: true,
360 size: 100,
361 ttl: Duration::from_secs(60),
362 };
363 let cache = L1HotCache::new(config);
364
365 cache.put("query1".to_string(), create_result("1"));
366 cache.put("query2".to_string(), create_result("2"));
367
368 cache.remove("query1");
369
370 assert!(cache.get("query1").is_none());
371 assert!(cache.get("query2").is_some());
372 }
373
374 #[test]
375 fn test_disabled_cache() {
376 let config = L1Config {
377 enabled: false,
378 size: 100,
379 ttl: Duration::from_secs(60),
380 };
381 let cache = L1HotCache::new(config);
382
383 cache.put("query".to_string(), create_result("data"));
384 assert!(cache.get("query").is_none());
385 }
386
387 #[test]
388 fn test_stats() {
389 let config = L1Config {
390 enabled: true,
391 size: 100,
392 ttl: Duration::from_secs(60),
393 };
394 let cache = L1HotCache::new(config);
395
396 cache.put("query1".to_string(), create_result("1"));
397 cache.put("query2".to_string(), create_result("2"));
398
399 cache.get("query1");
401 cache.get("query1");
402 cache.get("query2");
403
404 let stats = cache.stats();
405 assert_eq!(stats.entry_count, 2);
406 assert_eq!(stats.capacity, 100);
407 assert!(stats.total_size_bytes > 0);
408 assert_eq!(stats.total_accesses, 5); }
410
411 #[test]
412 fn test_evict_expired() {
413 let config = L1Config {
414 enabled: true,
415 size: 100,
416 ttl: Duration::from_millis(10),
417 };
418 let cache = L1HotCache::new(config);
419
420 cache.put("query1".to_string(), create_result("1"));
421 cache.put("query2".to_string(), create_result("2"));
422
423 std::thread::sleep(Duration::from_millis(15));
424
425 cache.evict_expired();
426
427 assert!(cache.is_empty());
428 }
429
430 #[test]
431 fn test_update_existing() {
432 let config = L1Config {
433 enabled: true,
434 size: 100,
435 ttl: Duration::from_secs(60),
436 };
437 let cache = L1HotCache::new(config);
438
439 cache.put("query".to_string(), create_result("old"));
440 cache.put("query".to_string(), create_result("new"));
441
442 let cached = cache.get("query").unwrap();
443 assert_eq!(cached.data, Bytes::from("new"));
444 }
445
446 #[test]
452 fn test_concurrent_hits_read_lock_only() {
453 use std::sync::Arc;
454 use std::thread;
455
456 let cache = Arc::new(L1HotCache::new(L1Config {
457 enabled: true,
458 size: 100,
459 ttl: Duration::from_secs(60),
460 }));
461 cache.put("hot-query".to_string(), create_result("hot data"));
462
463 const THREADS: usize = 16;
464 const ITERS_PER_THREAD: usize = 500;
465
466 let mut handles = Vec::with_capacity(THREADS);
467 for _ in 0..THREADS {
468 let cache = Arc::clone(&cache);
469 handles.push(thread::spawn(move || {
470 for _ in 0..ITERS_PER_THREAD {
471 let r = cache.get("hot-query").expect("hit expected");
472 assert_eq!(r.data, Bytes::from("hot data"));
473 }
474 }));
475 }
476 for h in handles {
477 h.join().unwrap();
478 }
479
480 let stats = cache.stats();
481 assert_eq!(
484 stats.total_accesses,
485 1 + (THREADS * ITERS_PER_THREAD) as u64
486 );
487 }
488}