Skip to main content

solana_validator_optimizer/
rpc_cache_layer.rs

1use crate::config::AppConfig;
2use crate::metrics::{REQUEST_COUNTER, CACHE_HIT_COUNTER, CACHE_MISS_COUNTER}; //  Import all counters
3use lru::LruCache;
4use solana_client::nonblocking::rpc_client::RpcClient;
5use solana_sdk::pubkey::Pubkey; 
6use solana_commitment_config::CommitmentConfig;
7use std::num::NonZeroUsize;
8use std::sync::Arc;
9use std::time::Duration;
10use tokio::sync::Mutex;
11use tokio::time::Instant;
12
13#[derive(Debug)]
14struct CachedResponse {
15    value: String,
16    expires_at: Instant,
17}
18
19type SharedCache = Arc<Mutex<LruCache<String, CachedResponse>>>;
20
21pub async fn start_rpc_cache(config: &AppConfig) -> anyhow::Result<()> {
22    println!(" Starting RPC LRU cache (size: {})", config.cache_size);
23
24    let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
25        NonZeroUsize::new(config.cache_size).unwrap(),
26    )));
27
28    let rpc_url = config.rpc_url.clone();
29    let cache_clone = cache.clone();
30
31    tokio::spawn(async move {
32        loop {
33            let keys = vec![
34                "getBalance:9Vpj7yMy7V7ojAB8BoS5efZLTi3kWJv3bXWQ7vLxB4vG".to_string(),
35                "getEpochInfo".to_string(),
36            ];
37
38            for key in keys {
39                let result = tokio::task::spawn_blocking({
40                    let cache = cache_clone.clone();
41                    let rpc_url = rpc_url.clone();
42                    let key = key.clone();
43                    move || {
44                        let rt = tokio::runtime::Runtime::new().unwrap();
45                        rt.block_on(handle_rpc_request(cache, key, rpc_url))
46                    }
47                })
48                .await
49                .unwrap_or_else(|e| format!("Spawn failed: {}", e));
50
51                println!("→ Response: {}", result);
52            }
53
54            tokio::time::sleep(Duration::from_secs(5)).await;
55        }
56    });
57
58    Ok(())
59}
60
61async fn handle_rpc_request(
62    cache: SharedCache,
63    key: String,
64    rpc_url: String,
65) -> String {
66    let mut cache_lock = cache.lock().await;
67
68    if let Some(entry) = cache_lock.get(&key) {
69        if Instant::now() < entry.expires_at {
70            println!(" Cache hit for {}", key);
71            CACHE_HIT_COUNTER.inc(); //  Record hit
72            return format!("(cached) {}", entry.value);
73        } else {
74            println!(" Cache expired for {}", key);
75        }
76    } else {
77        println!(" Cache miss for {}", key);
78        CACHE_MISS_COUNTER.inc(); //  Record miss
79    }
80
81    let fresh_value = match fetch_from_solana(&key, &rpc_url).await {
82        Ok(val) => val,
83        Err(e) => format!("Error: {}", e),
84    };
85
86    let new_entry = CachedResponse {
87        value: fresh_value.clone(),
88        expires_at: Instant::now() + Duration::from_secs(10),
89    };
90    cache_lock.put(key.clone(), new_entry);
91
92    fresh_value
93}
94
95async fn fetch_from_solana(key: &str, rpc_url: &str) -> anyhow::Result<String> {
96    REQUEST_COUNTER.inc(); //  Track actual RPC request
97
98    let client = RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed());
99
100    if key.starts_with("getBalance:") {
101        let pubkey_str = key.strip_prefix("getBalance:").unwrap();
102        let pubkey = pubkey_str.parse::<Pubkey>()?;
103        let lamports = client.get_balance(&pubkey).await?;
104        Ok(format!("Balance for {}: {} lamports", pubkey_str, lamports))
105    } else if key == "getEpochInfo" {
106        let epoch_info = client.get_epoch_info().await?;
107        Ok(format!(
108            "Epoch: {}, Slot: {}, Block height: {}",
109            epoch_info.epoch, epoch_info.absolute_slot, epoch_info.block_height
110        ))
111    } else {
112        Ok("Unsupported RPC key".to_string())
113    }
114}
115