solana_validator_optimizer/
rpc_cache_layer.rs1use crate::config::AppConfig;
2use crate::metrics::{REQUEST_COUNTER, CACHE_HIT_COUNTER, CACHE_MISS_COUNTER}; use lru::LruCache;
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::{pubkey::Pubkey, commitment_config::CommitmentConfig};
6use std::num::NonZeroUsize;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::Mutex;
10use tokio::time::Instant;
11
12#[derive(Debug)]
13struct CachedResponse {
14 value: String,
15 expires_at: Instant,
16}
17
18type SharedCache = Arc<Mutex<LruCache<String, CachedResponse>>>;
19
20pub async fn start_rpc_cache(config: &AppConfig) -> anyhow::Result<()> {
21 println!(" Starting RPC LRU cache (size: {})", config.cache_size);
22
23 let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
24 NonZeroUsize::new(config.cache_size).unwrap(),
25 )));
26
27 let rpc_url = config.rpc_url.clone();
28 let cache_clone = cache.clone();
29
30 tokio::spawn(async move {
31 loop {
32 let keys = vec![
33 "getBalance:9Vpj7yMy7V7ojAB8BoS5efZLTi3kWJv3bXWQ7vLxB4vG".to_string(),
34 "getEpochInfo".to_string(),
35 ];
36
37 for key in keys {
38 let result = tokio::task::spawn_blocking({
39 let cache = cache_clone.clone();
40 let rpc_url = rpc_url.clone();
41 let key = key.clone();
42 move || {
43 let rt = tokio::runtime::Runtime::new().unwrap();
44 rt.block_on(handle_rpc_request(cache, key, rpc_url))
45 }
46 })
47 .await
48 .unwrap_or_else(|e| format!("Spawn failed: {}", e));
49
50 println!("→ Response: {}", result);
51 }
52
53 tokio::time::sleep(Duration::from_secs(5)).await;
54 }
55 });
56
57 Ok(())
58}
59
60async fn handle_rpc_request(
61 cache: SharedCache,
62 key: String,
63 rpc_url: String,
64) -> String {
65 let mut cache_lock = cache.lock().await;
66
67 if let Some(entry) = cache_lock.get(&key) {
68 if Instant::now() < entry.expires_at {
69 println!(" Cache hit for {}", key);
70 CACHE_HIT_COUNTER.inc(); return format!("(cached) {}", entry.value);
72 } else {
73 println!(" Cache expired for {}", key);
74 }
75 } else {
76 println!(" Cache miss for {}", key);
77 CACHE_MISS_COUNTER.inc(); }
79
80 let fresh_value = match fetch_from_solana(&key, &rpc_url).await {
81 Ok(val) => val,
82 Err(e) => format!("Error: {}", e),
83 };
84
85 let new_entry = CachedResponse {
86 value: fresh_value.clone(),
87 expires_at: Instant::now() + Duration::from_secs(10),
88 };
89 cache_lock.put(key.clone(), new_entry);
90
91 fresh_value
92}
93
94async fn fetch_from_solana(key: &str, rpc_url: &str) -> anyhow::Result<String> {
95 REQUEST_COUNTER.inc(); let client = RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed());
98
99 if key.starts_with("getBalance:") {
100 let pubkey_str = key.strip_prefix("getBalance:").unwrap();
101 let pubkey = pubkey_str.parse::<Pubkey>()?;
102 let lamports = client.get_balance(&pubkey).await?;
103 Ok(format!("Balance for {}: {} lamports", pubkey_str, lamports))
104 } else if key == "getEpochInfo" {
105 let epoch_info = client.get_epoch_info().await?;
106 Ok(format!(
107 "Epoch: {}, Slot: {}, Block height: {}",
108 epoch_info.epoch, epoch_info.absolute_slot, epoch_info.block_height
109 ))
110 } else {
111 Ok("Unsupported RPC key".to_string())
112 }
113}
114