Skip to main content

solana_validator_optimizer/
rpc_cache_layer.rs

1use crate::config::AppConfig;
2use crate::metrics::{CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUEST_COUNTER};
3use lru::LruCache;
4use once_cell::sync::Lazy;
5use prometheus::IntCounter;
6use solana_client::nonblocking::rpc_client::RpcClient;
7use solana_commitment_config::CommitmentConfig;
8use solana_sdk::pubkey::Pubkey;
9use std::num::NonZeroUsize;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::Mutex;
13use tokio::time::Instant;
14
15#[derive(Debug, Clone)]
16struct CachedResponse {
17    value: String,
18    expires_at: Instant,
19}
20
21type SharedCache = Arc<Mutex<LruCache<String, CachedResponse>>>;
22
23/// Optional but useful: counts actual upstream RPC calls to Solana.
24/// This is different from REQUEST_COUNTER which counts requests handled by this layer.
25pub static UPSTREAM_REQUEST_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
26    prometheus::register_int_counter!(
27        "rpc_upstream_requests_total",
28        "Total number of upstream RPC calls made to Solana"
29    )
30    .expect("failed to register rpc_upstream_requests_total")
31});
32
33/// Starts the background RPC cache loop (spawns a task and returns immediately).
34pub async fn start_rpc_cache(config: &AppConfig) -> anyhow::Result<()> {
35    println!(" Starting RPC LRU cache (size: {})", config.cache_size);
36
37    let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
38        NonZeroUsize::new(config.cache_size)
39            .ok_or_else(|| anyhow::anyhow!("cache_size must be > 0"))?,
40    )));
41
42    let rpc_url = config.rpc_url.clone();
43    let cache_clone = cache.clone();
44
45    tokio::spawn(async move {
46        loop {
47            let keys = default_keys();
48
49            for key in keys {
50                let result = handle_rpc_request(cache_clone.clone(), key, &rpc_url).await;
51                println!("→ Response: {}", result);
52            }
53
54            tokio::time::sleep(Duration::from_secs(5)).await;
55        }
56    });
57
58    Ok(())
59}
60
61/// Runs one cache cycle (warm-up) and exits.
62/// Intended for: `svo rpc-cache --once` (cronjob/init-container use).
63pub async fn run_rpc_cache_once(config: &AppConfig) -> anyhow::Result<()> {
64    println!(
65        " Running RPC cache once (size: {}, rpc_url: {})",
66        config.cache_size, config.rpc_url
67    );
68
69    let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
70        NonZeroUsize::new(config.cache_size)
71            .ok_or_else(|| anyhow::anyhow!("cache_size must be > 0"))?,
72    )));
73
74    let rpc_url = config.rpc_url.clone();
75    let keys = default_keys();
76
77    for key in keys {
78        let result = handle_rpc_request(cache.clone(), key, &rpc_url).await;
79        println!("→ Response: {}", result);
80    }
81
82    Ok(())
83}
84
85fn default_keys() -> Vec<String> {
86    vec![
87        "getBalance:9Vpj7yMy7V7ojAB8BoS5efZLTi3kWJv3bXWQ7vLxB4vG".to_string(),
88        "getEpochInfo".to_string(),
89    ]
90}
91
92/// Handles a single request:
93/// - increments REQUEST_COUNTER for every request handled (hit or miss)
94/// - increments HIT/MISS counters appropriately
95/// - fetches from Solana only on miss/expired
96async fn handle_rpc_request(cache: SharedCache, key: String, rpc_url: &str) -> String {
97    // This is the most intuitive definition: "handled by this layer"
98    REQUEST_COUNTER.inc();
99
100    // 1) Fast path: check cache without holding lock during I/O
101    {
102        let mut cache_lock = cache.lock().await;
103
104        if let Some(entry) = cache_lock.get(&key) {
105            if Instant::now() < entry.expires_at {
106                println!(" Cache hit for {}", key);
107                CACHE_HIT_COUNTER.inc();
108                return format!("(cached) {}", entry.value);
109            }
110
111            // expired
112            println!(" Cache expired for {}", key);
113            CACHE_MISS_COUNTER.inc();
114        } else {
115            println!(" Cache miss for {}", key);
116            CACHE_MISS_COUNTER.inc();
117        }
118    } // lock dropped here
119
120    // 2) Miss path: fetch without holding cache lock
121    let fresh_value = match fetch_from_solana(&key, rpc_url).await {
122        Ok(val) => val,
123        Err(e) => format!("Error: {}", e),
124    };
125
126    // 3) Store result back in cache
127    {
128        let mut cache_lock = cache.lock().await;
129
130        let new_entry = CachedResponse {
131            value: fresh_value.clone(),
132            expires_at: Instant::now() + Duration::from_secs(10),
133        };
134
135        cache_lock.put(key, new_entry);
136    }
137
138    fresh_value
139}
140
141async fn fetch_from_solana(key: &str, rpc_url: &str) -> anyhow::Result<String> {
142    // Counts only real upstream calls to Solana.
143    UPSTREAM_REQUEST_COUNTER.inc();
144
145    let client =
146        RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed());
147
148    if key.starts_with("getBalance:") {
149        let pubkey_str = key
150            .strip_prefix("getBalance:")
151            .ok_or_else(|| anyhow::anyhow!("invalid getBalance key format"))?;
152
153        let pubkey = pubkey_str.parse::<Pubkey>()?;
154        let lamports = client.get_balance(&pubkey).await?;
155        Ok(format!("Balance for {}: {} lamports", pubkey_str, lamports))
156    } else if key == "getEpochInfo" {
157        let epoch_info = client.get_epoch_info().await?;
158        Ok(format!(
159            "Epoch: {}, Slot: {}, Block height: {}",
160            epoch_info.epoch, epoch_info.absolute_slot, epoch_info.block_height
161        ))
162    } else {
163        Ok("Unsupported RPC key".to_string())
164    }
165}