1use {
2 crate::{
3 consensus::tower_storage::{SavedTowerVersions, TowerStorage},
4 mock_alpenglow_consensus::MockAlpenglowConsensus,
5 next_leader::upcoming_leader_tpu_vote_sockets,
6 },
7 bincode::serialize,
8 crossbeam_channel::Receiver,
9 solana_client::connection_cache::ConnectionCache,
10 solana_clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET},
11 solana_connection_cache::client_connection::ClientConnection,
12 solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs},
13 solana_measure::measure::Measure,
14 solana_poh::poh_recorder::PohRecorder,
15 solana_runtime::bank_forks::BankForks,
16 solana_transaction::Transaction,
17 solana_transaction_error::TransportError,
18 std::{
19 net::{SocketAddr, UdpSocket},
20 sync::{Arc, RwLock},
21 thread::{self, Builder, JoinHandle},
22 },
23 thiserror::Error,
24};
25
26pub enum VoteOp {
27 PushVote {
28 tx: Transaction,
29 tower_slots: Vec<Slot>,
30 saved_tower: SavedTowerVersions,
31 },
32 RefreshVote {
33 tx: Transaction,
34 last_voted_slot: Slot,
35 },
36}
37
38impl VoteOp {
39 fn tx(&self) -> &Transaction {
40 match self {
41 VoteOp::PushVote { tx, .. } => tx,
42 VoteOp::RefreshVote { tx, .. } => tx,
43 }
44 }
45}
46
47#[derive(Debug, Error)]
48enum SendVoteError {
49 #[error(transparent)]
50 BincodeError(#[from] bincode::Error),
51 #[error("Invalid TPU address")]
52 InvalidTpuAddress,
53 #[error(transparent)]
54 TransportError(#[from] TransportError),
55}
56
57fn send_vote_transaction(
58 cluster_info: &ClusterInfo,
59 transaction: &Transaction,
60 tpu: Option<SocketAddr>,
61 connection_cache: &Arc<ConnectionCache>,
62) -> Result<(), SendVoteError> {
63 let tpu = tpu
64 .or_else(|| {
65 cluster_info
66 .my_contact_info()
67 .tpu(connection_cache.protocol())
68 })
69 .ok_or(SendVoteError::InvalidTpuAddress)?;
70 let buf = Arc::new(serialize(transaction)?);
71 let client = connection_cache.get_connection(&tpu);
72
73 client.send_data_async(buf).map_err(|err| {
74 error!("Ran into an error when sending vote: {err:?} to {tpu:?}");
75 SendVoteError::from(err)
76 })
77}
78
79pub struct VotingService {
80 thread_hdl: JoinHandle<()>,
81}
82
83impl VotingService {
84 pub fn new(
85 vote_receiver: Receiver<VoteOp>,
86 cluster_info: Arc<ClusterInfo>,
87 poh_recorder: Arc<RwLock<PohRecorder>>,
88 tower_storage: Arc<dyn TowerStorage>,
89 connection_cache: Arc<ConnectionCache>,
90 alpenglow_socket: Option<UdpSocket>,
91 bank_forks: Arc<RwLock<BankForks>>,
92 ) -> Self {
93 let thread_hdl = Builder::new()
94 .name("solVoteService".to_string())
95 .spawn({
96 let mut mock_alpenglow = alpenglow_socket.map(|s| {
97 MockAlpenglowConsensus::new(
98 s,
99 cluster_info.clone(),
100 EpochSpecs::from(bank_forks.clone()),
101 )
102 });
103 move || {
104 for vote_op in vote_receiver.iter() {
105 let vote_slot = match vote_op {
107 VoteOp::PushVote {
108 tx: _,
109 ref tower_slots,
110 ..
111 } => tower_slots.iter().copied().last(),
112 _ => None,
113 };
114 Self::handle_vote(
116 &cluster_info,
117 &poh_recorder,
118 tower_storage.as_ref(),
119 vote_op,
120 connection_cache.clone(),
121 );
122 if let Some(slot) = vote_slot {
124 if let Some(ag) = mock_alpenglow.as_mut() {
125 let root_bank = { bank_forks.read().unwrap().root_bank() };
126 ag.signal_new_slot(slot, &root_bank);
127 }
128 }
129 }
130 if let Some(ag) = mock_alpenglow {
131 let _ = ag.join();
132 }
133 }
134 })
135 .unwrap();
136 Self { thread_hdl }
137 }
138
139 pub fn handle_vote(
140 cluster_info: &ClusterInfo,
141 poh_recorder: &RwLock<PohRecorder>,
142 tower_storage: &dyn TowerStorage,
143 vote_op: VoteOp,
144 connection_cache: Arc<ConnectionCache>,
145 ) {
146 if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
147 let mut measure = Measure::start("tower storage save");
148 if let Err(err) = tower_storage.store(saved_tower) {
149 error!("Unable to save tower to storage: {err:?}");
150 std::process::exit(1);
151 }
152 measure.stop();
153 trace!("{measure}");
154 }
155
156 const UPCOMING_LEADER_FANOUT_SLOTS: u64 =
160 FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET.saturating_add(1);
161 #[cfg(test)]
162 static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 3);
163 let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets(
164 cluster_info,
165 poh_recorder,
166 UPCOMING_LEADER_FANOUT_SLOTS,
167 connection_cache.protocol(),
168 );
169
170 if !upcoming_leader_sockets.is_empty() {
171 for tpu_vote_socket in upcoming_leader_sockets {
172 let _ = send_vote_transaction(
173 cluster_info,
174 vote_op.tx(),
175 Some(tpu_vote_socket),
176 &connection_cache,
177 );
178 }
179 } else {
180 let _ = send_vote_transaction(cluster_info, vote_op.tx(), None, &connection_cache);
182 }
183
184 match vote_op {
185 VoteOp::PushVote {
186 tx, tower_slots, ..
187 } => {
188 cluster_info.push_vote(&tower_slots, tx);
189 }
190 VoteOp::RefreshVote {
191 tx,
192 last_voted_slot,
193 } => {
194 cluster_info.refresh_vote(tx, last_voted_slot);
195 }
196 }
197 }
198
199 pub fn join(self) -> thread::Result<()> {
200 self.thread_hdl.join()
201 }
202}