solana_core/
voting_service.rs

1use {
2    crate::{
3        consensus::tower_storage::{SavedTowerVersions, TowerStorage},
4        next_leader::upcoming_leader_tpu_vote_sockets,
5    },
6    bincode::serialize,
7    crossbeam_channel::Receiver,
8    solana_client::connection_cache::ConnectionCache,
9    solana_clock::{Slot, FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET},
10    solana_connection_cache::client_connection::ClientConnection,
11    solana_gossip::cluster_info::ClusterInfo,
12    solana_measure::measure::Measure,
13    solana_poh::poh_recorder::PohRecorder,
14    solana_transaction::Transaction,
15    solana_transaction_error::TransportError,
16    std::{
17        net::SocketAddr,
18        sync::{Arc, RwLock},
19        thread::{self, Builder, JoinHandle},
20    },
21    thiserror::Error,
22};
23
24pub enum VoteOp {
25    PushVote {
26        tx: Transaction,
27        tower_slots: Vec<Slot>,
28        saved_tower: SavedTowerVersions,
29    },
30    RefreshVote {
31        tx: Transaction,
32        last_voted_slot: Slot,
33    },
34}
35
36impl VoteOp {
37    fn tx(&self) -> &Transaction {
38        match self {
39            VoteOp::PushVote { tx, .. } => tx,
40            VoteOp::RefreshVote { tx, .. } => tx,
41        }
42    }
43}
44
45#[derive(Debug, Error)]
46enum SendVoteError {
47    #[error(transparent)]
48    BincodeError(#[from] bincode::Error),
49    #[error("Invalid TPU address")]
50    InvalidTpuAddress,
51    #[error(transparent)]
52    TransportError(#[from] TransportError),
53}
54
55fn send_vote_transaction(
56    cluster_info: &ClusterInfo,
57    transaction: &Transaction,
58    tpu: Option<SocketAddr>,
59    connection_cache: &Arc<ConnectionCache>,
60) -> Result<(), SendVoteError> {
61    let tpu = tpu
62        .or_else(|| {
63            cluster_info
64                .my_contact_info()
65                .tpu(connection_cache.protocol())
66        })
67        .ok_or(SendVoteError::InvalidTpuAddress)?;
68    let buf = serialize(transaction)?;
69    let client = connection_cache.get_connection(&tpu);
70
71    client.send_data_async(buf).map_err(|err| {
72        trace!("Ran into an error when sending vote: {err:?} to {tpu:?}");
73        SendVoteError::from(err)
74    })
75}
76
77pub struct VotingService {
78    thread_hdl: JoinHandle<()>,
79}
80
81impl VotingService {
82    pub fn new(
83        vote_receiver: Receiver<VoteOp>,
84        cluster_info: Arc<ClusterInfo>,
85        poh_recorder: Arc<RwLock<PohRecorder>>,
86        tower_storage: Arc<dyn TowerStorage>,
87        connection_cache: Arc<ConnectionCache>,
88    ) -> Self {
89        let thread_hdl = Builder::new()
90            .name("solVoteService".to_string())
91            .spawn(move || {
92                for vote_op in vote_receiver.iter() {
93                    Self::handle_vote(
94                        &cluster_info,
95                        &poh_recorder,
96                        tower_storage.as_ref(),
97                        vote_op,
98                        connection_cache.clone(),
99                    );
100                }
101            })
102            .unwrap();
103        Self { thread_hdl }
104    }
105
106    pub fn handle_vote(
107        cluster_info: &ClusterInfo,
108        poh_recorder: &RwLock<PohRecorder>,
109        tower_storage: &dyn TowerStorage,
110        vote_op: VoteOp,
111        connection_cache: Arc<ConnectionCache>,
112    ) {
113        if let VoteOp::PushVote { saved_tower, .. } = &vote_op {
114            let mut measure = Measure::start("tower storage save");
115            if let Err(err) = tower_storage.store(saved_tower) {
116                error!("Unable to save tower to storage: {:?}", err);
117                std::process::exit(1);
118            }
119            measure.stop();
120            trace!("{measure}");
121        }
122
123        // Attempt to send our vote transaction to the leaders for the next few
124        // slots. From the current slot to the forwarding slot offset
125        // (inclusive).
126        const UPCOMING_LEADER_FANOUT_SLOTS: u64 =
127            FORWARD_TRANSACTIONS_TO_LEADER_AT_SLOT_OFFSET.saturating_add(1);
128        #[cfg(test)]
129        static_assertions::const_assert_eq!(UPCOMING_LEADER_FANOUT_SLOTS, 3);
130        let upcoming_leader_sockets = upcoming_leader_tpu_vote_sockets(
131            cluster_info,
132            poh_recorder,
133            UPCOMING_LEADER_FANOUT_SLOTS,
134            connection_cache.protocol(),
135        );
136
137        if !upcoming_leader_sockets.is_empty() {
138            for tpu_vote_socket in upcoming_leader_sockets {
139                let _ = send_vote_transaction(
140                    cluster_info,
141                    vote_op.tx(),
142                    Some(tpu_vote_socket),
143                    &connection_cache,
144                );
145            }
146        } else {
147            // Send to our own tpu vote socket if we cannot find a leader to send to
148            let _ = send_vote_transaction(cluster_info, vote_op.tx(), None, &connection_cache);
149        }
150
151        match vote_op {
152            VoteOp::PushVote {
153                tx, tower_slots, ..
154            } => {
155                cluster_info.push_vote(&tower_slots, tx);
156            }
157            VoteOp::RefreshVote {
158                tx,
159                last_voted_slot,
160            } => {
161                cluster_info.refresh_vote(tx, last_voted_slot);
162            }
163        }
164    }
165
166    pub fn join(self) -> thread::Result<()> {
167        self.thread_hdl.join()
168    }
169}