solana_core/
cluster_slots_service.rs1pub mod cluster_slots;
2pub mod slot_supporters;
3use {
4 cluster_slots::ClusterSlots,
5 crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
6 solana_clock::Slot,
7 solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs},
8 solana_ledger::blockstore::Blockstore,
9 solana_measure::measure::Measure,
10 solana_runtime::bank_forks::BankForks,
11 std::{
12 sync::{
13 atomic::{AtomicBool, Ordering},
14 Arc, RwLock,
15 },
16 thread::{self, Builder, JoinHandle},
17 time::{Duration, Instant},
18 },
19};
20
21pub type ClusterSlotsUpdateReceiver = Receiver<Vec<Slot>>;
22pub type ClusterSlotsUpdateSender = Sender<Vec<Slot>>;
23
24#[derive(Default, Debug)]
25struct ClusterSlotsServiceTiming {
26 pub lowest_slot_elapsed: u64,
27 pub process_cluster_slots_updates_elapsed: u64,
28}
29
30impl ClusterSlotsServiceTiming {
31 fn update(&mut self, lowest_slot_elapsed: u64, process_cluster_slots_updates_elapsed: u64) {
32 self.lowest_slot_elapsed += lowest_slot_elapsed;
33 self.process_cluster_slots_updates_elapsed += process_cluster_slots_updates_elapsed;
34 }
35}
36
37pub struct ClusterSlotsService {
38 t_cluster_slots_service: JoinHandle<()>,
39}
40
41impl ClusterSlotsService {
42 pub fn new(
43 blockstore: Arc<Blockstore>,
44 cluster_slots: Arc<ClusterSlots>,
45 bank_forks: Arc<RwLock<BankForks>>,
46 cluster_info: Arc<ClusterInfo>,
47 cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
48 exit: Arc<AtomicBool>,
49 ) -> Self {
50 Self::initialize_lowest_slot(&blockstore, &cluster_info);
51 Self::initialize_epoch_slots(&bank_forks, &cluster_info);
52 let t_cluster_slots_service = Builder::new()
53 .name("solClusterSlots".to_string())
54 .spawn(move || {
55 Self::run(
56 blockstore,
57 cluster_slots,
58 bank_forks,
59 cluster_info,
60 cluster_slots_update_receiver,
61 exit,
62 )
63 })
64 .unwrap();
65
66 ClusterSlotsService {
67 t_cluster_slots_service,
68 }
69 }
70
71 pub fn join(self) -> thread::Result<()> {
72 self.t_cluster_slots_service.join()
73 }
74
75 fn run(
76 blockstore: Arc<Blockstore>,
77 cluster_slots: Arc<ClusterSlots>,
78 bank_forks: Arc<RwLock<BankForks>>,
79 cluster_info: Arc<ClusterInfo>,
80 cluster_slots_update_receiver: ClusterSlotsUpdateReceiver,
81 exit: Arc<AtomicBool>,
82 ) {
83 let mut cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
84 let mut last_stats = Instant::now();
85 let mut epoch_specs = EpochSpecs::from(bank_forks.clone());
86
87 let root_bank = bank_forks.read().unwrap().root_bank();
89 cluster_slots.update(&root_bank, &cluster_info);
90
91 while !exit.load(Ordering::Relaxed) {
92 let slots = match cluster_slots_update_receiver.recv_timeout(Duration::from_millis(200))
93 {
94 Ok(slots) => Some(slots),
95 Err(RecvTimeoutError::Timeout) => None,
96 Err(RecvTimeoutError::Disconnected) => {
97 warn!("Cluster slots service - sender disconnected");
98 break;
99 }
100 };
101 let mut lowest_slot_elapsed = Measure::start("lowest_slot_elapsed");
102 let lowest_slot = blockstore.lowest_slot();
103 Self::update_lowest_slot(lowest_slot, &cluster_info);
104 lowest_slot_elapsed.stop();
105 let mut process_cluster_slots_updates_elapsed =
106 Measure::start("process_cluster_slots_updates_elapsed");
107
108 if let Some(slots) = slots {
109 let node_id = cluster_info.id();
110 let my_stake = epoch_specs
111 .current_epoch_staked_nodes()
112 .get(&node_id)
113 .cloned()
114 .unwrap_or_default();
115 if my_stake > 0 {
117 Self::process_cluster_slots_updates(
118 slots,
119 &cluster_slots_update_receiver,
120 &cluster_info,
121 );
122 } else {
123 while cluster_slots_update_receiver.try_recv().is_ok() {}
125 }
126 }
127 let root_bank = bank_forks.read().unwrap().root_bank();
128 cluster_slots.update(&root_bank, &cluster_info);
129 process_cluster_slots_updates_elapsed.stop();
130
131 cluster_slots_service_timing.update(
132 lowest_slot_elapsed.as_us(),
133 process_cluster_slots_updates_elapsed.as_us(),
134 );
135
136 if last_stats.elapsed().as_secs() > 2 {
137 datapoint_info!(
138 "cluster_slots_service-timing",
139 (
140 "lowest_slot_elapsed",
141 cluster_slots_service_timing.lowest_slot_elapsed,
142 i64
143 ),
144 (
145 "process_cluster_slots_updates_elapsed",
146 cluster_slots_service_timing.process_cluster_slots_updates_elapsed,
147 i64
148 ),
149 );
150 cluster_slots_service_timing = ClusterSlotsServiceTiming::default();
151 last_stats = Instant::now();
152 }
153 }
154 }
155
156 fn process_cluster_slots_updates(
157 mut slots: Vec<Slot>,
158 cluster_slots_update_receiver: &ClusterSlotsUpdateReceiver,
159 cluster_info: &ClusterInfo,
160 ) {
161 while let Ok(mut more) = cluster_slots_update_receiver.try_recv() {
162 slots.append(&mut more);
163 }
164 #[allow(clippy::stable_sort_primitive)]
165 slots.sort();
166
167 if !slots.is_empty() {
168 cluster_info.push_epoch_slots(&slots);
169 }
170 }
171
172 fn initialize_lowest_slot(blockstore: &Blockstore, cluster_info: &ClusterInfo) {
173 cluster_info.push_lowest_slot(blockstore.lowest_slot());
178 }
179
180 fn update_lowest_slot(lowest_slot: Slot, cluster_info: &ClusterInfo) {
181 cluster_info.push_lowest_slot(lowest_slot);
182 }
183
184 fn initialize_epoch_slots(bank_forks: &RwLock<BankForks>, cluster_info: &ClusterInfo) {
185 let mut frozen_bank_slots: Vec<_> = bank_forks
189 .read()
190 .unwrap()
191 .frozen_banks()
192 .map(|(slot, _bank)| slot)
193 .collect();
194 frozen_bank_slots.sort_unstable();
195
196 if !frozen_bank_slots.is_empty() {
197 cluster_info.push_epoch_slots(&frozen_bank_slots);
198 }
199 }
200}
201
202#[cfg(test)]
203mod test {
204 use {
205 super::*,
206 solana_gossip::{crds_data::LowestSlot, node::Node},
207 solana_keypair::Keypair,
208 solana_signer::Signer,
209 solana_streamer::socket::SocketAddrSpace,
210 };
211
212 #[test]
213 pub fn test_update_lowest_slot() {
214 let keypair = Arc::new(Keypair::new());
215 let pubkey = keypair.pubkey();
216 let node_info = Node::new_localhost_with_pubkey(&pubkey);
217 let cluster_info = ClusterInfo::new(node_info.info, keypair, SocketAddrSpace::Unspecified);
218 ClusterSlotsService::update_lowest_slot(5, &cluster_info);
219 cluster_info.flush_push_queue();
220 let lowest = {
221 let gossip_crds = cluster_info.gossip.crds.read().unwrap();
222 gossip_crds.get::<&LowestSlot>(pubkey).unwrap().clone()
223 };
224 assert_eq!(lowest.lowest, 5);
225 }
226}