solana_core/
voting_service.rs1use {
2 crate::{
3 consensus::tower_storage::{SavedTowerVersions, TowerStorage},
4 next_leader::upcoming_leader_tpu_vote_sockets,
5 },
6 bincode::serialize,
7 crossbeam_channel::Receiver,
8 solana_client::connection_cache::ConnectionCache,
9 solana_clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET},
10 solana_connection_cache::client_connection::ClientConnection,
11 solana_gossip::cluster_info::ClusterInfo,
12 solana_measure::measure::Measure,
13 solana_poh::poh_recorder::PohRecorder,
14 solana_transaction::Transaction,
15 solana_transaction_error::TransportError,
16 std::{
17 net::SocketAddr,
18 sync::{Arc, RwLock},
19 thread::{self, Builder, JoinHandle},
20 },
21 thiserror::Error,
22};
23
24pub enum VoteOp {
25 PushVote {
26 tx: Transaction,
27 tower_slots: Vec<Slot>,
28 saved_tower: SavedTowerVersions,
29 },
30 RefreshVote {
31 tx: Transaction,
32 last_voted_slot: Slot,
33 },
34}
35
36impl VoteOp {
37 fn tx(&self) -> &Transaction {
38 match self {
39 VoteOp::PushVote { tx, .. } => tx,
40 VoteOp::RefreshVote { tx, .. } => tx,
41 }
42 }
43}
44
45#[derive(Debug, Error)]
46enum SendVoteError {
47 #[error(transparent)]
48 BincodeError(#[from] bincode::Error),
49 #[error("Invalid TPU address")]
50 InvalidTpuAddress,
51 #[error(transparent)]
52 TransportError(#[from] TransportError),
53}
54
55fn send_vote_transaction(
56 cluster_info: &ClusterInfo,
57 transaction: &Transaction,
58 tpu: Option<SocketAddr>,
59 connection_cache: &Arc<ConnectionCache>,
60) -> Result<(), SendVoteError> {
61 let tpu = tpu
62 .or_else(|| {
63 cluster_info
64 .my_contact_info()
65 .tpu(connection_cache.protocol())
66 })
67 .ok_or(SendVoteError::InvalidTpuAddress)?;
68 let buf = serialize(transaction)?;
69 let client = connection_cache.get_connection(&tpu);
70
71 client.send_data_async(buf).map_err(|err| {
72 trace!("Ran into an error when sending vote: {err:?} to {tpu:?}");
73 SendVoteError::from(err)
74 })
75}
76
77pub struct VotingService {
78 thread_hdl: JoinHandle<()>,
79}
80
81impl VotingService {
82 pub fn new(
83 vote_receiver: Receiver<VoteOp>,
84 cluster_info: Arc<ClusterInfo>,
85 poh_recorder: Arc<RwLock<PohRecorder>>,
86 tower_storage: Arc<dyn TowerStorage>,
87 connection_cache: Arc<ConnectionCache>,
88 ) -> Self {
89 let thread_hdl = Builder::new()
90 .name("solVoteService".to_string())
91 .spawn(move || {
92 for vote_op in vote_receiver.iter() {
93 Self::handle_vote(
94 &cluster_info,
95 &poh_recorder,
96 tower_storage.as_ref(),
97 vote_op,
98 connection_cache.clone(),
99 );
100 }
101 })
102 .unwrap();
103 Self { thread_hdl }
104 }
105
106 pub fn handle_vote(
107 cluster_info: &ClusterInfo,
108 poh_recorder: &RwLock<PohRecorder>,
109 tower_storage: &dyn TowerStorage,
110 vote_op: VoteOp,
111 connection_cache: Arc<ConnectionCache>,
112 ) {
113 if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
114 let mut measure = Measure::start("tower storage save");
115 if let Err(err) = tower_storage.store(saved_tower) {
116 error!("Unable to save tower to storage: {:?}", err);
117 std::process::exit(1);
118 }
119 measure.stop();
120 trace!("{measure}");
121 }
122
123 const UPCOMING_LEADER_FANOUT_SLOTS: u64 =
127 FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET.saturating_add(1);
128 #[cfg(test)]
129 static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 3);
130 let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets(
131 cluster_info,
132 poh_recorder,
133 UPCOMING_LEADER_FANOUT_SLOTS,
134 connection_cache.protocol(),
135 );
136
137 if !upcoming_leader_sockets.is_empty() {
138 for tpu_vote_socket in upcoming_leader_sockets {
139 let _ = send_vote_transaction(
140 cluster_info,
141 vote_op.tx(),
142 Some(tpu_vote_socket),
143 &connection_cache,
144 );
145 }
146 } else {
147 let _ = send_vote_transaction(cluster_info, vote_op.tx(), None, &connection_cache);
149 }
150
151 match vote_op {
152 VoteOp::PushVote {
153 tx, tower_slots, ..
154 } => {
155 cluster_info.push_vote(&tower_slots, tx);
156 }
157 VoteOp::RefreshVote {
158 tx,
159 last_voted_slot,
160 } => {
161 cluster_info.refresh_vote(tx, last_voted_slot);
162 }
163 }
164 }
165
166 pub fn join(self) -> thread::Result<()> {
167 self.thread_hdl.join()
168 }
169}