use {
rand::{Rng, rng},
solana_client::connection_cache::{ConnectionCache, Protocol},
solana_connection_cache::client_connection::ClientConnection as TpuConnection,
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfoQuery},
solana_poh::poh_recorder::PohRecorder,
solana_pubkey::Pubkey,
std::{
net::SocketAddr,
sync::{
Arc, RwLock,
atomic::{AtomicBool, Ordering},
},
thread::{self, Builder, JoinHandle, sleep},
time::Duration,
},
};
pub struct WarmQuicCacheService {
thread_hdl: JoinHandle<()>,
}
const CACHE_OFFSET_SLOT: i64 = 100;
const CACHE_JITTER_SLOT: i64 = 20;
impl WarmQuicCacheService {
fn warmup_connection(
cache: Option<&ConnectionCache>,
cluster_info: &ClusterInfo,
leader_pubkey: &Pubkey,
contact_info_selector: impl ContactInfoQuery<Option<SocketAddr>>,
log_context: &str,
) {
if let Some(connection_cache) = cache {
if let Some(Some(addr)) =
cluster_info.lookup_contact_info(leader_pubkey, contact_info_selector)
{
let conn = connection_cache.get_connection(&addr);
if let Err(err) = conn.send_data(&[]) {
warn!(
"Failed to warmup QUIC connection to the leader {leader_pubkey:?} at \
{addr:?}, Context: {log_context}, Error: {err:?}"
);
}
}
}
}
pub fn new(
tpu_connection_cache: Option<Arc<ConnectionCache>>,
vote_connection_cache: Option<Arc<ConnectionCache>>,
cluster_info: Arc<ClusterInfo>,
poh_recorder: Arc<RwLock<PohRecorder>>,
exit: Arc<AtomicBool>,
) -> Self {
assert!(matches!(
tpu_connection_cache.as_deref(),
None | Some(ConnectionCache::Quic(_))
));
assert!(matches!(
vote_connection_cache.as_deref(),
None | Some(ConnectionCache::Quic(_))
));
let thread_hdl = Builder::new()
.name("solWarmQuicSvc".to_string())
.spawn(move || {
let slot_jitter = rng().random_range(-CACHE_JITTER_SLOT..CACHE_JITTER_SLOT);
let mut maybe_last_leader = None;
while !exit.load(Ordering::Relaxed) {
let leader_pubkey = poh_recorder
.read()
.unwrap()
.leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64);
if let Some(leader_pubkey) = leader_pubkey {
if maybe_last_leader != Some(leader_pubkey) {
maybe_last_leader = Some(leader_pubkey);
Self::warmup_connection(
tpu_connection_cache.as_deref(),
&cluster_info,
&leader_pubkey,
|node| node.tpu(Protocol::QUIC),
"tpu",
);
Self::warmup_connection(
vote_connection_cache.as_deref(),
&cluster_info,
&leader_pubkey,
|node| node.tpu_vote(Protocol::QUIC),
"vote",
);
}
}
sleep(Duration::from_millis(200));
}
})
.unwrap();
Self { thread_hdl }
}
pub fn join(self) -> thread::Result<()> {
self.thread_hdl.join()
}
}