1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
// Connect to future leaders with some jitter so the quic connection is warm
// by the time we need it.

use {
    rand::{thread_rng, Rng},
    solana_client::{
        connection_cache::{ConnectionCache, Protocol},
        tpu_connection::TpuConnection,
    },
    solana_gossip::cluster_info::ClusterInfo,
    solana_poh::poh_recorder::PohRecorder,
    std::{
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc, RwLock,
        },
        thread::{self, sleep, Builder, JoinHandle},
        time::Duration,
    },
};

pub struct WarmQuicCacheService {
    thread_hdl: JoinHandle<()>,
}

// ~50 seconds
const CACHE_OFFSET_SLOT: i64 = 100;
const CACHE_JITTER_SLOT: i64 = 20;

impl WarmQuicCacheService {
    pub fn new(
        connection_cache: Arc<ConnectionCache>,
        cluster_info: Arc<ClusterInfo>,
        poh_recorder: Arc<RwLock<PohRecorder>>,
        exit: Arc<AtomicBool>,
    ) -> Self {
        assert!(matches!(*connection_cache, ConnectionCache::Quic(_)));
        let thread_hdl = Builder::new()
            .name("solWarmQuicSvc".to_string())
            .spawn(move || {
                let slot_jitter = thread_rng().gen_range(-CACHE_JITTER_SLOT..CACHE_JITTER_SLOT);
                let mut maybe_last_leader = None;
                while !exit.load(Ordering::Relaxed) {
                    let leader_pubkey =  poh_recorder
                        .read()
                        .unwrap()
                        .leader_after_n_slots((CACHE_OFFSET_SLOT + slot_jitter) as u64);
                    if let Some(leader_pubkey) = leader_pubkey {
                        if maybe_last_leader
                            .map_or(true, |last_leader| last_leader != leader_pubkey)
                        {
                            maybe_last_leader = Some(leader_pubkey);
                            if let Some(Ok(addr)) = cluster_info
                                .lookup_contact_info(&leader_pubkey, |node| node.tpu(Protocol::QUIC))
                            {
                                let conn = connection_cache.get_connection(&addr);
                                if let Err(err) = conn.send_data(&[]) {
                                    warn!(
                                        "Failed to warmup QUIC connection to the leader {:?}, Error {:?}",
                                        leader_pubkey, err
                                    );
                                }
                            }
                        }
                    }
                    sleep(Duration::from_millis(200));
                }
            })
            .unwrap();
        Self { thread_hdl }
    }

    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }
}