1use edb_common::{
20 cache::{CachePath, EdbCachePath},
21 forking,
22};
23use eyre::Result;
24use serde::{Deserialize, Serialize};
25use serde_json::Value;
26use std::{
27 collections::{HashMap, HashSet},
28 fs,
29 path::PathBuf,
30 sync::Arc,
31};
32use tokio::sync::RwLock;
33use tracing::{debug, info, warn};
34
35#[derive(Clone, Serialize, Deserialize)]
40pub struct CacheEntry {
41 pub data: Value,
43 pub accessed_at: u64,
45}
46
47impl CacheEntry {
48 fn new(data: Value) -> Self {
49 Self {
50 data,
51 accessed_at: std::time::SystemTime::now()
52 .duration_since(std::time::UNIX_EPOCH)
53 .unwrap_or_default()
54 .as_secs(),
55 }
56 }
57
58 fn update_access_time(&mut self) {
59 self.accessed_at = std::time::SystemTime::now()
60 .duration_since(std::time::UNIX_EPOCH)
61 .unwrap_or_default()
62 .as_secs();
63 }
64}
65
66pub struct CacheManager {
71 cache: Arc<RwLock<HashMap<String, CacheEntry>>>,
72 max_items: u32,
73 cache_file_path: PathBuf,
74}
75
76impl CacheManager {
77 pub fn new(max_items: u32, cache_path: PathBuf) -> Result<Self> {
86 info!("Using cache file: {}", cache_path.display());
87
88 let mut cache = if cache_path.exists() {
90 match fs::read_to_string(&cache_path) {
91 Ok(content) => {
92 match serde_json::from_str::<HashMap<String, CacheEntry>>(&content) {
93 Ok(loaded_cache) => {
94 info!("Loaded {} cache entries from disk", loaded_cache.len());
95 loaded_cache
96 }
97 Err(e) => {
98 warn!("Failed to parse cache file, starting with empty cache: {}", e);
99 HashMap::new()
100 }
101 }
102 }
103 Err(e) => {
104 warn!("Failed to read cache file, starting with empty cache: {}", e);
105 HashMap::new()
106 }
107 }
108 } else {
109 info!("No existing cache file found, starting with empty cache");
110 HashMap::new()
111 };
112
113 if cache.len() >= max_items as usize {
114 Self::evict_to_size(&mut cache, max_items as usize);
116 }
117
118 Ok(Self { cache: Arc::new(RwLock::new(cache)), max_items, cache_file_path: cache_path })
119 }
120
121 pub async fn get(&self, key: &str) -> Option<Value> {
129 let mut cache = self.cache.write().await;
130 if let Some(entry) = cache.get_mut(key) {
131 debug!("Cache hit: {}", key);
132 entry.update_access_time(); Some(entry.data.clone())
134 } else {
135 debug!("Cache miss: {}", key);
136 None
137 }
138 }
139
140 pub async fn set(&self, key: String, value: Value) {
148 let mut cache = self.cache.write().await;
149
150 if cache.len() >= self.max_items as usize {
152 Self::evict_oldest(&mut cache);
153 }
154
155 let entry = CacheEntry::new(value);
156 cache.insert(key.clone(), entry);
157 debug!("Cached entry: {}", key);
158 }
159
160 fn evict_oldest(cache: &mut HashMap<String, CacheEntry>) {
161 let to_remove = (cache.len() / 10).max(1);
163 debug!("Evicting {} oldest cache entries", to_remove);
164
165 Self::evict_to_size(cache, cache.len().saturating_sub(to_remove));
166 }
167
168 pub async fn save_to_disk(&self) -> Result<()> {
182 match self.save_to_disk_impl().await {
183 Ok(()) => Ok(()),
184 Err(e) => {
185 warn!("Failed to save cache to disk: {}. In-memory cache remains available.", e);
186 Ok(())
188 }
189 }
190 }
191
192 async fn save_to_disk_impl(&self) -> Result<()> {
194 let existing_cache = match self.load_existing_cache() {
196 Ok(cache) => cache,
197 Err(e) => {
198 warn!("Failed to load existing cache for merge, using empty: {}", e);
199 HashMap::new()
200 }
201 };
202
203 let original_disk_size = existing_cache.len();
204
205 let current_cache = self.cache.read().await.clone();
207 let current_memory_size = current_cache.len();
208
209 let merged_cache = self.merge_caches(existing_cache, current_cache);
211
212 let final_cache =
214 self.apply_size_management(merged_cache, original_disk_size, current_memory_size).await;
215
216 let temp_file = self.cache_file_path.with_extension("tmp");
218 let content = serde_json::to_string_pretty(&final_cache)?;
219
220 fs::write(&temp_file, &content)?;
221 fs::rename(&temp_file, &self.cache_file_path)?; info!(
224 "Saved {} cache entries to disk (merged from {} disk + {} memory)",
225 final_cache.len(),
226 original_disk_size,
227 current_memory_size
228 );
229 Ok(())
230 }
231
232 pub async fn delete_by_method(&self, method: &str) -> Result<usize> {
240 let mut cache = self.cache.write().await;
241
242 let prefix = format!("{method}:");
244 let keys_to_delete: Vec<String> =
245 cache.keys().filter(|k| k.starts_with(&prefix)).cloned().collect();
246
247 let deleted_count = keys_to_delete.len();
248 for key in keys_to_delete {
249 cache.remove(&key);
250 }
251
252 if deleted_count > 0 {
253 info!("Deleted {} entries for method '{}'", deleted_count, method);
254 let current_cache = cache.clone();
255 drop(cache); self.force_save_to_disk(current_cache).await?;
257 }
258
259 Ok(deleted_count)
260 }
261
262 pub async fn delete_by_key(&self, key: &str) -> Result<bool> {
264 let mut cache = self.cache.write().await;
265 let found = cache.remove(key).is_some();
266
267 if found {
268 let current_cache = cache.clone();
269 drop(cache);
270 self.force_save_to_disk(current_cache).await?;
271 }
272
273 Ok(found)
274 }
275
276 async fn force_save_to_disk(&self, cache_to_save: HashMap<String, CacheEntry>) -> Result<()> {
282 let temp_file = self.cache_file_path.with_extension("tmp");
284 let content = serde_json::to_string_pretty(&cache_to_save)?;
285
286 fs::write(&temp_file, &content)?;
287 fs::rename(&temp_file, &self.cache_file_path)?; info!("Force saved {} cache entries to disk (no merge)", cache_to_save.len());
290 Ok(())
291 }
292
293 pub async fn detailed_stats(&self) -> serde_json::Value {
298 let cache = self.cache.read().await;
299 let current_time = std::time::SystemTime::now()
300 .duration_since(std::time::UNIX_EPOCH)
301 .unwrap_or_default()
302 .as_secs();
303
304 let mut oldest_entry = None;
305 let mut newest_entry = None;
306
307 for entry in cache.values() {
308 if oldest_entry.is_none() || entry.accessed_at < oldest_entry.unwrap() {
309 oldest_entry = Some(entry.accessed_at);
310 }
311 if newest_entry.is_none() || entry.accessed_at > newest_entry.unwrap() {
312 newest_entry = Some(entry.accessed_at);
313 }
314 }
315
316 serde_json::json!({
317 "total_entries": cache.len(),
318 "max_entries": self.max_items,
319 "utilization": format!("{:.1}%", (cache.len() as f64 / self.max_items as f64) * 100.0),
320 "oldest_entry_age_seconds": oldest_entry.map(|t| current_time.saturating_sub(t)),
321 "newest_entry_age_seconds": newest_entry.map(|t| current_time.saturating_sub(t)),
322 "cache_file_path": self.cache_file_path.display().to_string(),
323 })
324 }
325
326 #[allow(dead_code)]
334 pub async fn get_all_entries(&self) -> HashMap<String, CacheEntry> {
335 let cache = self.cache.read().await;
336 cache.clone()
337 }
338
339 pub async fn get_cache_path(
350 rpc_urls: &[String],
351 cache_dir: Option<PathBuf>,
352 ) -> Result<PathBuf> {
353 let chain_ids: HashSet<_> =
354 futures::future::join_all(rpc_urls.iter().map(|url| forking::get_chain_id(url)))
355 .await
356 .into_iter()
357 .filter_map(Result::ok)
358 .collect();
359
360 if chain_ids.len() != 1 {
361 eyre::bail!("All RPC URLs must belong to the same chain. Found: {:?}", chain_ids);
362 }
363
364 let chain_id = *chain_ids.iter().next().unwrap();
365
366 let cache_path = EdbCachePath::new(cache_dir)
367 .rpc_chain_cache_dir(chain_id)
368 .unwrap_or_else(|| PathBuf::from("."))
369 .join("rpc.json");
370
371 if let Some(parent) = cache_path.parent() {
373 fs::create_dir_all(parent)?;
374 }
375
376 Ok(cache_path)
377 }
378
379 fn load_existing_cache(&self) -> Result<HashMap<String, CacheEntry>> {
384 if !self.cache_file_path.exists() {
385 return Ok(HashMap::new());
386 }
387
388 let content = fs::read_to_string(&self.cache_file_path)?;
389 let cache: HashMap<String, CacheEntry> = serde_json::from_str(&content)?;
390 Ok(cache)
391 }
392
393 fn merge_caches(
402 &self,
403 disk_cache: HashMap<String, CacheEntry>,
404 memory_cache: HashMap<String, CacheEntry>,
405 ) -> HashMap<String, CacheEntry> {
406 let mut merged = disk_cache;
407
408 for (key, memory_entry) in memory_cache {
409 match merged.get(&key) {
410 Some(disk_entry) => {
411 if memory_entry.accessed_at >= disk_entry.accessed_at {
413 merged.insert(key, memory_entry);
414 }
415 }
417 None => {
418 merged.insert(key, memory_entry);
420 }
421 }
422 }
423
424 merged
425 }
426
427 async fn apply_size_management(
441 &self,
442 mut merged_cache: HashMap<String, CacheEntry>,
443 original_disk_size: usize,
444 current_memory_size: usize,
445 ) -> HashMap<String, CacheEntry> {
446 let target_size = if original_disk_size >= current_memory_size {
448 original_disk_size
450 } else {
451 std::cmp::min(self.max_items as usize, merged_cache.len())
453 };
454
455 if merged_cache.len() <= target_size {
457 return merged_cache;
458 }
459
460 Self::evict_to_size(&mut merged_cache, target_size);
462 merged_cache
463 }
464
465 fn evict_to_size(cache: &mut HashMap<String, CacheEntry>, target_size: usize) {
474 if cache.len() <= target_size {
475 return;
476 }
477
478 let to_remove = cache.len().saturating_sub(target_size);
479
480 let mut entries: Vec<(String, u64)> =
482 cache.iter().map(|(key, entry)| (key.clone(), entry.accessed_at)).collect();
483
484 entries.sort_by_key(|(_, accessed_at)| *accessed_at);
485
486 let keys_to_remove: Vec<String> =
488 entries.into_iter().take(to_remove).map(|(key, _)| key).collect();
489
490 for key in &keys_to_remove {
491 cache.remove(key);
492 }
493
494 debug!(
495 "Evicted {} entries during merge to fit target size {}",
496 keys_to_remove.len(),
497 target_size
498 );
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use tempfile::TempDir;
506 use tokio::time::{sleep, Duration};
507 use tracing::{debug, info};
508
509 fn create_test_cache_manager(max_items: u32) -> (CacheManager, TempDir) {
510 let temp_dir = TempDir::new().unwrap();
511 let cache_path = temp_dir.path().join("test_rpc.json");
512 let manager = CacheManager::new(max_items, cache_path).unwrap();
513 (manager, temp_dir)
514 }
515
516 #[tokio::test]
517 async fn test_cache_get_set() {
518 edb_common::logging::ensure_test_logging(None);
519 info!("Testing cache get/set operations");
520
521 let (manager, _temp_dir) = create_test_cache_manager(10);
522
523 assert!(manager.get("test_key").await.is_none());
525
526 let test_value = serde_json::json!({"result": "test_data"});
528 manager.set("test_key".to_string(), test_value.clone()).await;
529
530 let retrieved = manager.get("test_key").await.unwrap();
531 assert_eq!(retrieved, test_value);
532 }
533
534 #[tokio::test]
535 async fn test_cache_eviction() {
536 edb_common::logging::ensure_test_logging(None);
537 info!("Testing cache eviction behavior");
538
539 let (manager, _temp_dir) = create_test_cache_manager(3);
540
541 for i in 0..3 {
543 let key = format!("key_{}", i);
544 let value = serde_json::json!({"data": i});
545 manager.set(key, value).await;
546 sleep(Duration::from_secs(1)).await;
547 }
548
549 manager.set("key_3".to_string(), serde_json::json!({"data": 3})).await;
551
552 assert!(manager.get("key_0").await.is_none());
554 assert!(manager.get("key_3").await.is_some());
555 }
556
557 #[tokio::test]
558 async fn test_cache_eviction_order() {
559 edb_common::logging::ensure_test_logging(None);
560 info!("Testing cache eviction order");
561
562 let (manager, _temp_dir) = create_test_cache_manager(3);
563
564 manager.set("old_key".to_string(), serde_json::json!({"data": "old"})).await;
566 sleep(Duration::from_secs(1)).await;
567
568 manager.set("mid_key".to_string(), serde_json::json!({"data": "mid"})).await;
569 sleep(Duration::from_secs(1)).await;
570
571 manager.set("new_key".to_string(), serde_json::json!({"data": "new"})).await;
572 sleep(Duration::from_secs(1)).await;
573
574 manager.set("newest_key".to_string(), serde_json::json!({"data": "newest"})).await;
576
577 assert!(manager.get("old_key").await.is_none());
579 assert!(manager.get("mid_key").await.is_some());
580 assert!(manager.get("new_key").await.is_some());
581 assert!(manager.get("newest_key").await.is_some());
582 }
583
584 #[tokio::test]
585 async fn test_cache_persistence() {
586 edb_common::logging::ensure_test_logging(None);
587 info!("Testing cache persistence across restarts");
588
589 let temp_dir = TempDir::new().unwrap();
590 let cache_path = temp_dir.path().join("persist_test.json");
591
592 {
594 let manager = CacheManager::new(10, cache_path.clone()).unwrap();
595 manager.set("persist_key".to_string(), serde_json::json!({"persisted": true})).await;
596 manager.save_to_disk().await.unwrap();
597 } let manager2 = CacheManager::new(10, cache_path).unwrap();
601 let retrieved = manager2.get("persist_key").await.unwrap();
602 assert_eq!(retrieved, serde_json::json!({"persisted": true}));
603 }
604
605 #[tokio::test]
606 async fn test_detailed_stats() {
607 edb_common::logging::ensure_test_logging(None);
608 info!("Testing detailed cache statistics");
609
610 let (manager, _temp_dir) = create_test_cache_manager(100);
611
612 let stats = manager.detailed_stats().await;
614 assert_eq!(stats["total_entries"], 0);
615 assert_eq!(stats["max_entries"], 100);
616 assert_eq!(stats["utilization"], "0.0%");
617 assert!(stats["oldest_entry_age_seconds"].is_null());
618 assert!(stats["newest_entry_age_seconds"].is_null());
619
620 manager.set("item1".to_string(), serde_json::json!({"data": 1})).await;
622 sleep(Duration::from_secs(1)).await;
623 manager.set("item2".to_string(), serde_json::json!({"data": 2})).await;
624
625 let stats = manager.detailed_stats().await;
627 assert_eq!(stats["total_entries"], 2);
628 assert_eq!(stats["max_entries"], 100);
629 assert_eq!(stats["utilization"], "2.0%");
630 assert!(
631 stats["oldest_entry_age_seconds"].as_u64().unwrap()
632 >= stats["newest_entry_age_seconds"].as_u64().unwrap()
633 );
634 }
635
636 #[tokio::test]
637 async fn test_cache_entry_timestamps() {
638 edb_common::logging::ensure_test_logging(None);
639 debug!("Testing cache entry timestamp behavior");
640
641 let entry1 = CacheEntry::new(serde_json::json!({"test": 1}));
642 sleep(Duration::from_secs(1)).await;
643 let entry2 = CacheEntry::new(serde_json::json!({"test": 2}));
644
645 assert!(entry2.accessed_at > entry1.accessed_at);
646 }
647
648 #[tokio::test]
649 async fn test_cache_merge_and_size_management() {
650 edb_common::logging::ensure_test_logging(None);
651 info!("Testing cache merge functionality and size management");
652
653 let temp_dir = TempDir::new().unwrap();
654 let cache_path = temp_dir.path().join("merge_test.json");
655
656 {
658 let manager = CacheManager::new(5, cache_path.clone()).unwrap(); manager.set("old_key1".to_string(), serde_json::json!({"data": "old1"})).await;
660 manager.set("shared_key".to_string(), serde_json::json!({"data": "old_shared"})).await;
661 manager.save_to_disk().await.unwrap();
662 } sleep(Duration::from_secs(1)).await;
666
667 let manager2 = CacheManager::new(5, cache_path.clone()).unwrap(); manager2.set("new_key1".to_string(), serde_json::json!({"data": "new1"})).await;
670 manager2.set("new_key2".to_string(), serde_json::json!({"data": "new2"})).await;
671 manager2.set("shared_key".to_string(), serde_json::json!({"data": "new_shared"})).await;
672
673 manager2.save_to_disk().await.unwrap();
675
676 let manager3 = CacheManager::new(10, cache_path).unwrap();
678
679 assert!(manager3.get("old_key1").await.is_some());
681 assert!(manager3.get("new_key1").await.is_some());
682 assert!(manager3.get("new_key2").await.is_some());
683
684 let shared_value = manager3.get("shared_key").await.unwrap();
686 assert_eq!(shared_value["data"], "new_shared");
687
688 info!("Cache merge test completed successfully");
689 }
690
691 #[tokio::test]
692 async fn test_size_management_disk_larger() {
693 edb_common::logging::ensure_test_logging(None);
694 info!("Testing size management when disk cache is larger");
695
696 let temp_dir = TempDir::new().unwrap();
697 let cache_path = temp_dir.path().join("size_test.json");
698
699 {
701 let manager = CacheManager::new(10, cache_path.clone()).unwrap();
702 for i in 0..5 {
703 manager.set(format!("disk_key_{}", i), serde_json::json!({"data": i})).await;
704 }
705 manager.save_to_disk().await.unwrap();
706 }
707
708 let manager2 = CacheManager::new(3, cache_path.clone()).unwrap(); manager2.set("memory_key_1".to_string(), serde_json::json!({"data": "mem1"})).await;
711 manager2.set("memory_key_2".to_string(), serde_json::json!({"data": "mem2"})).await;
712
713 manager2.save_to_disk().await.unwrap();
715
716 let manager3 = CacheManager::new(10, cache_path).unwrap();
718 let all_entries = manager3.get_all_entries().await;
719 assert_eq!(all_entries.len(), 5); info!("Size management test completed - disk cache size respected");
722 }
723
724 #[tokio::test]
725 async fn test_size_management_memory_larger() {
726 edb_common::logging::ensure_test_logging(None);
727 info!("Testing size management when memory cache is larger");
728
729 let temp_dir = TempDir::new().unwrap();
730 let cache_path = temp_dir.path().join("size_test2.json");
731
732 {
734 let manager = CacheManager::new(10, cache_path.clone()).unwrap();
735 manager.set("disk_key_1".to_string(), serde_json::json!({"data": "disk1"})).await;
736 manager.set("disk_key_2".to_string(), serde_json::json!({"data": "disk2"})).await;
737 manager.save_to_disk().await.unwrap();
738 }
739
740 let manager2 = CacheManager::new(6, cache_path.clone()).unwrap(); for i in 0..4 {
743 manager2
744 .set(format!("memory_key_{}", i), serde_json::json!({"data": format!("mem{}", i)}))
745 .await;
746 }
747
748 manager2.save_to_disk().await.unwrap();
750
751 let manager3 = CacheManager::new(10, cache_path).unwrap();
753 let all_entries = manager3.get_all_entries().await;
754 assert_eq!(all_entries.len(), 6); info!("Size management test completed - cache growth allowed");
757 }
758}