solana_core/
cluster_slots_service.rs

1pub mod cluster_slots;
2pub mod slot_supporters;
3use {
4    cluster_slots::ClusterSlots,
5    crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
6    solana_clock::Slot,
7    solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs},
8    solana_ledger::blockstore::Blockstore,
9    solana_measure::measure::Measure,
10    solana_runtime::bank_forks::BankForks,
11    std::{
12        sync::{
13            atomic::{AtomicBool, Ordering},
14            Arc, RwLock,
15        },
16        thread::{self, Builder, JoinHandle},
17        time::{Duration, Instant},
18    },
19};
20
21pub type ClusterSlotsUpdateReceiver = Receiver<Vec<Slot>>;
22pub type ClusterSlotsUpdateSender = Sender<Vec<Slot>>;
23
24#[derive(Default, Debug)]
25struct ClusterSlotsServiceTiming {
26    pub lowest_slot_elapsed: u64,
27    pub process_cluster_slots_updates_elapsed: u64,
28}
29
30impl ClusterSlotsServiceTiming {
31    fn update(&mut self, lowest_slot_elapsed: u64, process_cluster_slots_updates_elapsed: u64) {
32        self.lowest_slot_elapsed += lowest_slot_elapsed;
33        self.process_cluster_slots_updates_elapsed += process_cluster_slots_updates_elapsed;
34    }
35}
36
37pub struct ClusterSlotsService {
38    t_cluster_slots_service: JoinHandle<()>,
39}
40
41impl ClusterSlotsService {
42    pub fn new(
43        blockstore: Arc<Blockstore>,
44        cluster_slots: Arc<ClusterSlots>,
45        bank_forks: Arc<RwLock<BankForks>>,
46        cluster_info: Arc<ClusterInfo>,
47        cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
48        exit: Arc<AtomicBool>,
49    ) -> Self {
50        Self::initialize_lowest_slot(&blockstore, &cluster_info);
51        Self::initialize_epoch_slots(&bank_forks, &cluster_info);
52        let t_cluster_slots_service = Builder::new()
53            .name("solClusterSlots".to_string())
54            .spawn(move || {
55                Self::run(
56                    blockstore,
57                    cluster_slots,
58                    bank_forks,
59                    cluster_info,
60                    cluster_slots_update_receiver,
61                    exit,
62                )
63            })
64            .unwrap();
65
66        ClusterSlotsService {
67            t_cluster_slots_service,
68        }
69    }
70
71    pub fn join(self) -> thread::Result<()> {
72        self.t_cluster_slots_service.join()
73    }
74
75    fn run(
76        blockstore: Arc<Blockstore>,
77        cluster_slots: Arc<ClusterSlots>,
78        bank_forks: Arc<RwLock<BankForks>>,
79        cluster_info: Arc<ClusterInfo>,
80        cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
81        exit: Arc<AtomicBool>,
82    ) {
83        let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
84        let mut last_stats = Instant::now();
85        let mut epoch_specs = EpochSpecs::from(bank_forks.clone());
86
87        // initialize cluster slots with the current root bank
88        let root_bank = bank_forks.read().unwrap().root_bank();
89        cluster_slots.update(&root_bank, &cluster_info);
90
91        while !exit.load(Ordering::Relaxed) {
92            let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200))
93            {
94                Ok(slots) => Some(slots),
95                Err(RecvTimeoutError::Timeout) => None,
96                Err(RecvTimeoutError::Disconnected) => {
97                    warn!("Cluster slots service - sender disconnected");
98                    break;
99                }
100            };
101            let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
102            let lowest_slot = blockstore.lowest_slot();
103            Self::update_lowest_slot(lowest_slot, &cluster_info);
104            lowest_slot_elapsed.stop();
105            let mut process_cluster_slots_updates_elapsed =
106                Measure::start("process_cluster_slots_updates_elapsed");
107
108            if let Some(slots) = slots {
109                let node_id = cluster_info.id();
110                let my_stake = epoch_specs
111                    .current_epoch_staked_nodes()
112                    .get(&node_id)
113                    .cloned()
114                    .unwrap_or_default();
115                // staked node should push EpochSlots into CRDS to save gossip bandwidth
116                if my_stake > 0 {
117                    Self::process_cluster_slots_updates(
118                        slots,
119                        &cluster_slots_update_receiver,
120                        &cluster_info,
121                    );
122                } else {
123                    // drain the channel dropping the updates
124                    while cluster_slots_update_receiver.try_recv().is_ok() {}
125                }
126            }
127            let root_bank = bank_forks.read().unwrap().root_bank();
128            cluster_slots.update(&root_bank, &cluster_info);
129            process_cluster_slots_updates_elapsed.stop();
130
131            cluster_slots_service_timing.update(
132                lowest_slot_elapsed.as_us(),
133                process_cluster_slots_updates_elapsed.as_us(),
134            );
135
136            if last_stats.elapsed().as_secs() > 2 {
137                datapoint_info!(
138                    "cluster_slots_service-timing",
139                    (
140                        "lowest_slot_elapsed",
141                        cluster_slots_service_timing.lowest_slot_elapsed,
142                        i64
143                    ),
144                    (
145                        "process_cluster_slots_updates_elapsed",
146                        cluster_slots_service_timing.process_cluster_slots_updates_elapsed,
147                        i64
148                    ),
149                );
150                cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
151                last_stats = Instant::now();
152            }
153        }
154    }
155
156    fn process_cluster_slots_updates(
157        mut slots: Vec<Slot>,
158        cluster_slots_update_receiver: &ClusterSlotsUpdateReceiver,
159        cluster_info: &ClusterInfo,
160    ) {
161        while let Ok(mut more) = cluster_slots_update_receiver.try_recv() {
162            slots.append(&mut more);
163        }
164        #[allow(clippy::stable_sort_primitive)]
165        slots.sort();
166
167        if !slots.is_empty() {
168            cluster_info.push_epoch_slots(&slots);
169        }
170    }
171
172    fn initialize_lowest_slot(blockstore: &Blockstore, cluster_info: &ClusterInfo) {
173        // Safe to set into gossip because by this time, the leader schedule cache should
174        // also be updated with the latest root (done in blockstore_processor) and thus
175        // will provide a schedule to window_service for any incoming shreds up to the
176        // last_confirmed_epoch.
177        cluster_info.push_lowest_slot(blockstore.lowest_slot());
178    }
179
180    fn update_lowest_slot(lowest_slot: Slot, cluster_info: &ClusterInfo) {
181        cluster_info.push_lowest_slot(lowest_slot);
182    }
183
184    fn initialize_epoch_slots(bank_forks: &RwLock<BankForks>, cluster_info: &ClusterInfo) {
185        // TODO: Should probably incorporate slots that were replayed on startup,
186        // and maybe some that were frozen < snapshot root in case validators restart
187        // from newer snapshots and lose history.
188        let mut frozen_bank_slots: Vec<_> = bank_forks
189            .read()
190            .unwrap()
191            .frozen_banks()
192            .map(|(slot, _bank)| slot)
193            .collect();
194        frozen_bank_slots.sort_unstable();
195
196        if !frozen_bank_slots.is_empty() {
197            cluster_info.push_epoch_slots(&frozen_bank_slots);
198        }
199    }
200}
201
202#[cfg(test)]
203mod test {
204    use {
205        super::*,
206        solana_gossip::{crds_data::LowestSlot, node::Node},
207        solana_keypair::Keypair,
208        solana_signer::Signer,
209        solana_streamer::socket::SocketAddrSpace,
210    };
211
212    #[test]
213    pub fn test_update_lowest_slot() {
214        let keypair = Arc::new(Keypair::new());
215        let pubkey = keypair.pubkey();
216        let node_info = Node::new_localhost_with_pubkey(&pubkey);
217        let cluster_info = ClusterInfo::new(node_info.info, keypair, SocketAddrSpace::Unspecified);
218        ClusterSlotsService::update_lowest_slot(5, &cluster_info);
219        cluster_info.flush_push_queue();
220        let lowest = {
221            let gossip_crds = cluster_info.gossip.crds.read().unwrap();
222            gossip_crds.get::<&LowestSlot>(pubkey).unwrap().clone()
223        };
224        assert_eq!(lowest.lowest, 5);
225    }
226}