use crate::config::AppConfig;
use crate::metrics::{CACHE_HIT_COUNTER, CACHE_MISS_COUNTER, REQUEST_COUNTER};
use lru::LruCache;
use once_cell::sync::Lazy;
use prometheus::IntCounter;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_commitment_config::CommitmentConfig;
use solana_sdk::pubkey::Pubkey;
use std::num::NonZeroUsize;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::Instant;
#[derive(Debug, Clone)]
struct CachedResponse {
value: String,
expires_at: Instant,
}
type SharedCache = Arc<Mutex<LruCache<String, CachedResponse>>>;
pub static UPSTREAM_REQUEST_COUNTER: Lazy<IntCounter> = Lazy::new(|| {
prometheus::register_int_counter!(
"rpc_upstream_requests_total",
"Total number of upstream RPC calls made to Solana"
)
.expect("failed to register rpc_upstream_requests_total")
});
pub async fn start_rpc_cache(config: &AppConfig) -> anyhow::Result<()> {
println!(" Starting RPC LRU cache (size: {})", config.cache_size);
let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(config.cache_size)
.ok_or_else(|| anyhow::anyhow!("cache_size must be > 0"))?,
)));
let rpc_url = config.rpc_url.clone();
let cache_clone = cache.clone();
tokio::spawn(async move {
loop {
let keys = default_keys();
for key in keys {
let result = handle_rpc_request(cache_clone.clone(), key, &rpc_url).await;
println!("→ Response: {}", result);
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
});
Ok(())
}
pub async fn run_rpc_cache_once(config: &AppConfig) -> anyhow::Result<()> {
println!(
" Running RPC cache once (size: {}, rpc_url: {})",
config.cache_size, config.rpc_url
);
let cache: SharedCache = Arc::new(Mutex::new(LruCache::new(
NonZeroUsize::new(config.cache_size)
.ok_or_else(|| anyhow::anyhow!("cache_size must be > 0"))?,
)));
let rpc_url = config.rpc_url.clone();
let keys = default_keys();
for key in keys {
let result = handle_rpc_request(cache.clone(), key, &rpc_url).await;
println!("→ Response: {}", result);
}
Ok(())
}
fn default_keys() -> Vec<String> {
vec![
"getBalance:9Vpj7yMy7V7ojAB8BoS5efZLTi3kWJv3bXWQ7vLxB4vG".to_string(),
"getEpochInfo".to_string(),
]
}
async fn handle_rpc_request(cache: SharedCache, key: String, rpc_url: &str) -> String {
REQUEST_COUNTER.inc();
{
let mut cache_lock = cache.lock().await;
if let Some(entry) = cache_lock.get(&key) {
if Instant::now() < entry.expires_at {
println!(" Cache hit for {}", key);
CACHE_HIT_COUNTER.inc();
return format!("(cached) {}", entry.value);
}
println!(" Cache expired for {}", key);
CACHE_MISS_COUNTER.inc();
} else {
println!(" Cache miss for {}", key);
CACHE_MISS_COUNTER.inc();
}
}
let fresh_value = match fetch_from_solana(&key, rpc_url).await {
Ok(val) => val,
Err(e) => format!("Error: {}", e),
};
{
let mut cache_lock = cache.lock().await;
let new_entry = CachedResponse {
value: fresh_value.clone(),
expires_at: Instant::now() + Duration::from_secs(10),
};
cache_lock.put(key, new_entry);
}
fresh_value
}
async fn fetch_from_solana(key: &str, rpc_url: &str) -> anyhow::Result<String> {
UPSTREAM_REQUEST_COUNTER.inc();
let client =
RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed());
if key.starts_with("getBalance:") {
let pubkey_str = key
.strip_prefix("getBalance:")
.ok_or_else(|| anyhow::anyhow!("invalid getBalance key format"))?;
let pubkey = pubkey_str.parse::<Pubkey>()?;
let lamports = client.get_balance(&pubkey).await?;
Ok(format!("Balance for {}: {} lamports", pubkey_str, lamports))
} else if key == "getEpochInfo" {
let epoch_info = client.get_epoch_info().await?;
Ok(format!(
"Epoch: {}, Slot: {}, Block height: {}",
epoch_info.epoch, epoch_info.absolute_slot, epoch_info.block_height
))
} else {
Ok("Unsupported RPC key".to_string())
}
}