solana_core/
voting_service.rs

1use {
2    crate::{
3        consensus::tower_storage::{SavedTowerVersions, TowerStorage},
4        mock_alpenglow_consensus::MockAlpenglowConsensus,
5        next_leader::upcoming_leader_tpu_vote_sockets,
6    },
7    bincode::serialize,
8    crossbeam_channel::Receiver,
9    solana_client::connection_cache::ConnectionCache,
10    solana_clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET},
11    solana_connection_cache::client_connection::ClientConnection,
12    solana_gossip::{cluster_info::ClusterInfo, epoch_specs::EpochSpecs},
13    solana_measure::measure::Measure,
14    solana_poh::poh_recorder::PohRecorder,
15    solana_runtime::bank_forks::BankForks,
16    solana_transaction::Transaction,
17    solana_transaction_error::TransportError,
18    std::{
19        net::{SocketAddr, UdpSocket},
20        sync::{Arc, RwLock},
21        thread::{self, Builder, JoinHandle},
22    },
23    thiserror::Error,
24};
25
26pub enum VoteOp {
27    PushVote {
28        tx: Transaction,
29        tower_slots: Vec<Slot>,
30        saved_tower: SavedTowerVersions,
31    },
32    RefreshVote {
33        tx: Transaction,
34        last_voted_slot: Slot,
35    },
36}
37
38impl VoteOp {
39    fn tx(&self) -> &Transaction {
40        match self {
41            VoteOp::PushVote { tx, .. } => tx,
42            VoteOp::RefreshVote { tx, .. } => tx,
43        }
44    }
45}
46
47#[derive(Debug, Error)]
48enum SendVoteError {
49    #[error(transparent)]
50    BincodeError(#[from] bincode::Error),
51    #[error("Invalid TPU address")]
52    InvalidTpuAddress,
53    #[error(transparent)]
54    TransportError(#[from] TransportError),
55}
56
57fn send_vote_transaction(
58    cluster_info: &ClusterInfo,
59    transaction: &Transaction,
60    tpu: Option<SocketAddr>,
61    connection_cache: &Arc<ConnectionCache>,
62) -> Result<(), SendVoteError> {
63    let tpu = tpu
64        .or_else(|| {
65            cluster_info
66                .my_contact_info()
67                .tpu(connection_cache.protocol())
68        })
69        .ok_or(SendVoteError::InvalidTpuAddress)?;
70    let buf = Arc::new(serialize(transaction)?);
71    let client = connection_cache.get_connection(&tpu);
72
73    client.send_data_async(buf).map_err(|err| {
74        error!("Ran into an error when sending vote: {err:?} to {tpu:?}");
75        SendVoteError::from(err)
76    })
77}
78
79pub struct VotingService {
80    thread_hdl: JoinHandle<()>,
81}
82
83impl VotingService {
84    pub fn new(
85        vote_receiver: Receiver<VoteOp>,
86        cluster_info: Arc<ClusterInfo>,
87        poh_recorder: Arc<RwLock<PohRecorder>>,
88        tower_storage: Arc<dyn TowerStorage>,
89        connection_cache: Arc<ConnectionCache>,
90        alpenglow_socket: Option<UdpSocket>,
91        bank_forks: Arc<RwLock<BankForks>>,
92    ) -> Self {
93        let thread_hdl = Builder::new()
94            .name("solVoteService".to_string())
95            .spawn({
96                let mut mock_alpenglow = alpenglow_socket.map(|s| {
97                    MockAlpenglowConsensus::new(
98                        s,
99                        cluster_info.clone(),
100                        EpochSpecs::from(bank_forks.clone()),
101                    )
102                });
103                move || {
104                    for vote_op in vote_receiver.iter() {
105                        // Figure out if we are casting a vote for a new slot, and what slot it is for
106                        let vote_slot = match vote_op {
107                            VoteOp::PushVote {
108                                tx: _,
109                                ref tower_slots,
110                                ..
111                            } => tower_slots.iter().copied().last(),
112                            _ => None,
113                        };
114                        // perform all the normal vote handling routines
115                        Self::handle_vote(
116                            &cluster_info,
117                            &poh_recorder,
118                            tower_storage.as_ref(),
119                            vote_op,
120                            connection_cache.clone(),
121                        );
122                        // trigger mock alpenglow vote if we have just cast an actual vote
123                        if let Some(slot) = vote_slot {
124                            if let Some(ag) = mock_alpenglow.as_mut() {
125                                let root_bank = { bank_forks.read().unwrap().root_bank() };
126                                ag.signal_new_slot(slot, &root_bank);
127                            }
128                        }
129                    }
130                    if let Some(ag) = mock_alpenglow {
131                        let _ = ag.join();
132                    }
133                }
134            })
135            .unwrap();
136        Self { thread_hdl }
137    }
138
139    pub fn handle_vote(
140        cluster_info: &ClusterInfo,
141        poh_recorder: &RwLock<PohRecorder>,
142        tower_storage: &dyn TowerStorage,
143        vote_op: VoteOp,
144        connection_cache: Arc<ConnectionCache>,
145    ) {
146        if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
147            let mut measure = Measure::start("tower storage save");
148            if let Err(err) = tower_storage.store(saved_tower) {
149                error!("Unable to save tower to storage: {err:?}");
150                std::process::exit(1);
151            }
152            measure.stop();
153            trace!("{measure}");
154        }
155
156        // Attempt to send our vote transaction to the leaders for the next few
157        // slots. From the current slot to the forwarding slot offset
158        // (inclusive).
159        const UPCOMING_LEADER_FANOUT_SLOTS: u64 =
160            FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET.saturating_add(1);
161        #[cfg(test)]
162        static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 3);
163        let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets(
164            cluster_info,
165            poh_recorder,
166            UPCOMING_LEADER_FANOUT_SLOTS,
167            connection_cache.protocol(),
168        );
169
170        if !upcoming_leader_sockets.is_empty() {
171            for tpu_vote_socket in upcoming_leader_sockets {
172                let _ = send_vote_transaction(
173                    cluster_info,
174                    vote_op.tx(),
175                    Some(tpu_vote_socket),
176                    &connection_cache,
177                );
178            }
179        } else {
180            // Send to our own tpu vote socket if we cannot find a leader to send to
181            let _ = send_vote_transaction(cluster_info, vote_op.tx(), None, &connection_cache);
182        }
183
184        match vote_op {
185            VoteOp::PushVote {
186                tx, tower_slots, ..
187            } => {
188                cluster_info.push_vote(&tower_slots, tx);
189            }
190            VoteOp::RefreshVote {
191                tx,
192                last_voted_slot,
193            } => {
194                cluster_info.refresh_vote(tx, last_voted_slot);
195            }
196        }
197    }
198
199    pub fn join(self) -> thread::Result<()> {
200        self.thread_hdl.join()
201    }
202}