solana_core/consensus/
progress_map.rs

1use {
2    crate::{
3        cluster_info_vote_listener::SlotVoteTracker,
4        cluster_slots_service::slot_supporters::SlotSupporters,
5        consensus::{Stake, ThresholdDecision, VotedStakes},
6        replay_stage::SUPERMINORITY_THRESHOLD,
7    },
8    solana_clock::Slot,
9    solana_hash::Hash,
10    solana_ledger::blockstore_processor::{ConfirmationProgress, ReplaySlotStats},
11    solana_pubkey::Pubkey,
12    solana_runtime::{bank::Bank, bank_forks::BankForks},
13    solana_vote::vote_account::VoteAccountsHashMap,
14    std::{
15        collections::{BTreeMap, HashMap, HashSet},
16        sync::{Arc, RwLock},
17        time::Instant,
18    },
19};
20
21type VotedSlot = Slot;
22type ExpirationSlot = Slot;
23pub type LockoutIntervals = BTreeMap<ExpirationSlot, Vec<(VotedSlot, Pubkey)>>;
24
25#[derive(Debug)]
26pub struct ValidatorStakeInfo {
27    pub validator_vote_pubkey: Pubkey,
28    pub stake: u64,
29    pub total_epoch_stake: u64,
30}
31
32impl Default for ValidatorStakeInfo {
33    fn default() -> Self {
34        Self {
35            stake: 0,
36            validator_vote_pubkey: Pubkey::default(),
37            total_epoch_stake: 1,
38        }
39    }
40}
41
42impl ValidatorStakeInfo {
43    pub fn new(validator_vote_pubkey: Pubkey, stake: u64, total_epoch_stake: u64) -> Self {
44        Self {
45            validator_vote_pubkey,
46            stake,
47            total_epoch_stake,
48        }
49    }
50}
51
52pub const RETRANSMIT_BASE_DELAY_MS: u64 = 5_000;
53pub const RETRANSMIT_BACKOFF_CAP: u32 = 6;
54
55#[derive(Debug)]
56pub struct RetransmitInfo {
57    pub(crate) retry_time: Instant,
58    pub(crate) retry_iteration: u32,
59}
60
61impl RetransmitInfo {
62    pub fn reached_retransmit_threshold(&self) -> bool {
63        let backoff = std::cmp::min(self.retry_iteration, RETRANSMIT_BACKOFF_CAP);
64        let backoff_duration_ms = (1_u64 << backoff) * RETRANSMIT_BASE_DELAY_MS;
65        self.retry_time.elapsed().as_millis() > u128::from(backoff_duration_ms)
66    }
67
68    pub fn increment_retry_iteration(&mut self) {
69        self.retry_iteration = self.retry_iteration.saturating_add(1);
70        self.retry_time = Instant::now();
71    }
72}
73
74pub struct ForkProgress {
75    pub is_dead: bool,
76    pub fork_stats: ForkStats,
77    pub propagated_stats: PropagatedStats,
78    pub replay_stats: Arc<RwLock<ReplaySlotStats>>,
79    pub replay_progress: Arc<RwLock<ConfirmationProgress>>,
80    pub retransmit_info: RetransmitInfo,
81    // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only
82    // count new blocks replayed since last restart, which won't include
83    // blocks already existing in the ledger/before snapshot at start,
84    // so these stats do not span all of time
85    pub num_blocks_on_fork: u64,
86    pub num_dropped_blocks_on_fork: u64,
87}
88
89impl ForkProgress {
90    pub fn new(
91        last_entry: Hash,
92        prev_leader_slot: Option<Slot>,
93        validator_stake_info: Option<ValidatorStakeInfo>,
94        num_blocks_on_fork: u64,
95        num_dropped_blocks_on_fork: u64,
96    ) -> Self {
97        let (
98            is_leader_slot,
99            propagated_validators_stake,
100            propagated_validators,
101            is_propagated,
102            total_epoch_stake,
103        ) = validator_stake_info
104            .map(|info| {
105                (
106                    true,
107                    info.stake,
108                    vec![info.validator_vote_pubkey].into_iter().collect(),
109                    {
110                        if info.total_epoch_stake == 0 {
111                            true
112                        } else {
113                            info.stake as f64 / info.total_epoch_stake as f64
114                                > SUPERMINORITY_THRESHOLD
115                        }
116                    },
117                    info.total_epoch_stake,
118                )
119            })
120            .unwrap_or((false, 0, HashSet::new(), false, 0));
121
122        Self {
123            is_dead: false,
124            fork_stats: ForkStats::default(),
125            replay_stats: Arc::new(RwLock::new(ReplaySlotStats::default())),
126            replay_progress: Arc::new(RwLock::new(ConfirmationProgress::new(last_entry))),
127            num_blocks_on_fork,
128            num_dropped_blocks_on_fork,
129            propagated_stats: PropagatedStats {
130                propagated_validators,
131                propagated_validators_stake,
132                is_propagated,
133                is_leader_slot,
134                prev_leader_slot,
135                total_epoch_stake,
136                ..PropagatedStats::default()
137            },
138            retransmit_info: RetransmitInfo {
139                retry_time: Instant::now(),
140                retry_iteration: 0u32,
141            },
142        }
143    }
144
145    pub fn new_from_bank(
146        bank: &Bank,
147        validator_identity: &Pubkey,
148        validator_vote_pubkey: &Pubkey,
149        prev_leader_slot: Option<Slot>,
150        num_blocks_on_fork: u64,
151        num_dropped_blocks_on_fork: u64,
152    ) -> Self {
153        let validator_stake_info = {
154            if bank.collector_id() == validator_identity {
155                Some(ValidatorStakeInfo::new(
156                    *validator_vote_pubkey,
157                    bank.epoch_vote_account_stake(validator_vote_pubkey),
158                    bank.total_epoch_stake(),
159                ))
160            } else {
161                None
162            }
163        };
164
165        let mut new_progress = Self::new(
166            bank.last_blockhash(),
167            prev_leader_slot,
168            validator_stake_info,
169            num_blocks_on_fork,
170            num_dropped_blocks_on_fork,
171        );
172
173        if bank.is_frozen() {
174            new_progress.fork_stats.bank_hash = Some(bank.hash());
175        }
176        new_progress
177    }
178}
179
180#[derive(Debug, Clone, Default)]
181pub struct ForkStats {
182    pub fork_stake: Stake,
183    pub total_stake: Stake,
184    pub block_height: u64,
185    pub has_voted: bool,
186    pub is_recent: bool,
187    pub is_empty: bool,
188    pub vote_threshold: Vec<ThresholdDecision>,
189    pub is_locked_out: bool,
190    pub voted_stakes: VotedStakes,
191    pub duplicate_confirmed_hash: Option<Hash>,
192    pub computed: bool,
193    pub lockout_intervals: LockoutIntervals,
194    pub bank_hash: Option<Hash>,
195    pub my_latest_landed_vote: Option<Slot>,
196}
197
198impl ForkStats {
199    /// Return fork_weight, i.e. bank_stake over total_stake.
200    pub fn fork_weight(&self) -> f64 {
201        self.fork_stake as f64 / self.total_stake as f64
202    }
203}
204
205#[derive(Clone, Default)]
206pub struct PropagatedStats {
207    pub propagated_validators: HashSet<Pubkey>,
208    pub propagated_node_ids: HashSet<Pubkey>,
209    pub propagated_validators_stake: u64,
210    pub is_propagated: bool,
211    pub is_leader_slot: bool,
212    pub prev_leader_slot: Option<Slot>,
213    pub slot_vote_tracker: Option<Arc<RwLock<SlotVoteTracker>>>,
214    pub cluster_slot_pubkeys: Option<Arc<SlotSupporters>>,
215    pub total_epoch_stake: u64,
216}
217
218impl PropagatedStats {
219    pub fn add_vote_pubkey(&mut self, vote_pubkey: Pubkey, stake: u64) {
220        if self.propagated_validators.insert(vote_pubkey) {
221            self.propagated_validators_stake += stake;
222        }
223    }
224
225    pub fn add_node_pubkey(&mut self, node_pubkey: &Pubkey, bank: &Bank) {
226        if !self.propagated_node_ids.contains(node_pubkey) {
227            let node_vote_accounts = bank
228                .epoch_vote_accounts_for_node_id(node_pubkey)
229                .map(|v| &v.vote_accounts);
230
231            if let Some(node_vote_accounts) = node_vote_accounts {
232                self.add_node_pubkey_internal(
233                    node_pubkey,
234                    node_vote_accounts,
235                    bank.epoch_vote_accounts(bank.epoch())
236                        .expect("Epoch stakes for bank's own epoch must exist"),
237                );
238            }
239        }
240    }
241
242    fn add_node_pubkey_internal(
243        &mut self,
244        node_pubkey: &Pubkey,
245        vote_account_pubkeys: &[Pubkey],
246        epoch_vote_accounts: &VoteAccountsHashMap,
247    ) {
248        self.propagated_node_ids.insert(*node_pubkey);
249        for vote_account_pubkey in vote_account_pubkeys.iter() {
250            let stake = epoch_vote_accounts
251                .get(vote_account_pubkey)
252                .map(|(stake, _)| *stake)
253                .unwrap_or(0);
254            self.add_vote_pubkey(*vote_account_pubkey, stake);
255        }
256    }
257}
258
259#[derive(Default)]
260pub struct ProgressMap {
261    progress_map: HashMap<Slot, ForkProgress>,
262}
263
264impl std::ops::Deref for ProgressMap {
265    type Target = HashMap<Slot, ForkProgress>;
266    fn deref(&self) -> &Self::Target {
267        &self.progress_map
268    }
269}
270
271impl std::ops::DerefMut for ProgressMap {
272    fn deref_mut(&mut self) -> &mut Self::Target {
273        &mut self.progress_map
274    }
275}
276
277impl ProgressMap {
278    pub fn insert(&mut self, slot: Slot, fork_progress: ForkProgress) {
279        self.progress_map.insert(slot, fork_progress);
280    }
281
282    pub fn get_propagated_stats(&self, slot: Slot) -> Option<&PropagatedStats> {
283        self.progress_map
284            .get(&slot)
285            .map(|fork_progress| &fork_progress.propagated_stats)
286    }
287
288    pub fn get_propagated_stats_mut(&mut self, slot: Slot) -> Option<&mut PropagatedStats> {
289        self.progress_map
290            .get_mut(&slot)
291            .map(|fork_progress| &mut fork_progress.propagated_stats)
292    }
293
294    pub fn get_propagated_stats_must_exist(&self, slot: Slot) -> &PropagatedStats {
295        self.get_propagated_stats(slot)
296            .unwrap_or_else(|| panic!("slot={slot} must exist in ProgressMap"))
297    }
298
299    pub fn get_fork_stats(&self, slot: Slot) -> Option<&ForkStats> {
300        self.progress_map
301            .get(&slot)
302            .map(|fork_progress| &fork_progress.fork_stats)
303    }
304
305    pub fn get_fork_stats_mut(&mut self, slot: Slot) -> Option<&mut ForkStats> {
306        self.progress_map
307            .get_mut(&slot)
308            .map(|fork_progress| &mut fork_progress.fork_stats)
309    }
310
311    pub fn get_retransmit_info(&self, slot: Slot) -> Option<&RetransmitInfo> {
312        self.progress_map
313            .get(&slot)
314            .map(|fork_progress| &fork_progress.retransmit_info)
315    }
316
317    pub fn get_retransmit_info_mut(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> {
318        self.progress_map
319            .get_mut(&slot)
320            .map(|fork_progress| &mut fork_progress.retransmit_info)
321    }
322
323    pub fn is_dead(&self, slot: Slot) -> Option<bool> {
324        self.progress_map
325            .get(&slot)
326            .map(|fork_progress| fork_progress.is_dead)
327    }
328
329    pub fn get_hash(&self, slot: Slot) -> Option<Hash> {
330        self.progress_map
331            .get(&slot)
332            .and_then(|fork_progress| fork_progress.fork_stats.bank_hash)
333    }
334
335    pub fn is_propagated(&self, slot: Slot) -> Option<bool> {
336        self.get_propagated_stats(slot)
337            .map(|stats| stats.is_propagated)
338    }
339
340    pub fn get_latest_leader_slot_must_exist(&self, slot: Slot) -> Option<Slot> {
341        let propagated_stats = self.get_propagated_stats_must_exist(slot);
342        if propagated_stats.is_leader_slot {
343            Some(slot)
344        } else {
345            propagated_stats.prev_leader_slot
346        }
347    }
348
349    pub fn get_leader_propagation_slot_must_exist(&self, slot: Slot) -> (bool, Option<Slot>) {
350        if let Some(leader_slot) = self.get_latest_leader_slot_must_exist(slot) {
351            // If the leader's stats are None (isn't in the
352            // progress map), this means that prev_leader slot is
353            // rooted, so return true
354            (
355                self.is_propagated(leader_slot).unwrap_or(true),
356                Some(leader_slot),
357            )
358        } else {
359            // prev_leader_slot doesn't exist because already rooted
360            // or this validator hasn't been scheduled as a leader
361            // yet. In both cases the latest leader is vacuously
362            // confirmed
363            (true, None)
364        }
365    }
366
367    pub fn my_latest_landed_vote(&self, slot: Slot) -> Option<Slot> {
368        self.progress_map
369            .get(&slot)
370            .and_then(|s| s.fork_stats.my_latest_landed_vote)
371    }
372
373    pub fn set_duplicate_confirmed_hash(&mut self, slot: Slot, hash: Hash) {
374        let slot_progress = self.get_mut(&slot).unwrap();
375        slot_progress.fork_stats.duplicate_confirmed_hash = Some(hash);
376    }
377
378    pub fn is_duplicate_confirmed(&self, slot: Slot) -> Option<bool> {
379        self.progress_map
380            .get(&slot)
381            .map(|s| s.fork_stats.duplicate_confirmed_hash.is_some())
382    }
383
384    pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option<Slot> {
385        let parent_slot = bank.parent_slot();
386        self.get_propagated_stats(parent_slot)
387            .map(|stats| {
388                if stats.is_leader_slot {
389                    Some(parent_slot)
390                } else {
391                    stats.prev_leader_slot
392                }
393            })
394            .unwrap_or(None)
395    }
396
397    pub fn handle_new_root(&mut self, bank_forks: &BankForks) {
398        self.progress_map
399            .retain(|k, _| bank_forks.get(*k).is_some());
400    }
401
402    pub fn log_propagated_stats(&self, slot: Slot, bank_forks: &RwLock<BankForks>) {
403        if let Some(stats) = self.get_propagated_stats(slot) {
404            info!(
405                "Propagated stats: \
406                 total staked: {}, \
407                 observed staked: {}, \
408                 vote pubkeys: {:?}, \
409                 node_pubkeys: {:?}, \
410                 slot: {slot}, \
411                 epoch: {:?}",
412                stats.total_epoch_stake,
413                stats.propagated_validators_stake,
414                stats.propagated_validators,
415                stats.propagated_node_ids,
416                bank_forks.read().unwrap().get(slot).map(|x| x.epoch()),
417            );
418        }
419    }
420}
421
422#[cfg(test)]
423mod test {
424    use {super::*, solana_vote::vote_account::VoteAccount};
425
426    #[test]
427    fn test_add_vote_pubkey() {
428        let mut stats = PropagatedStats::default();
429        let mut vote_pubkey = solana_pubkey::new_rand();
430
431        // Add a vote pubkey, the number of references in all_pubkeys
432        // should be 2
433        stats.add_vote_pubkey(vote_pubkey, 1);
434        assert!(stats.propagated_validators.contains(&vote_pubkey));
435        assert_eq!(stats.propagated_validators_stake, 1);
436
437        // Adding it again should change no state since the key already existed
438        stats.add_vote_pubkey(vote_pubkey, 1);
439        assert!(stats.propagated_validators.contains(&vote_pubkey));
440        assert_eq!(stats.propagated_validators_stake, 1);
441
442        // Adding another pubkey should succeed
443        vote_pubkey = solana_pubkey::new_rand();
444        stats.add_vote_pubkey(vote_pubkey, 2);
445        assert!(stats.propagated_validators.contains(&vote_pubkey));
446        assert_eq!(stats.propagated_validators_stake, 3);
447    }
448
449    #[test]
450    fn test_add_node_pubkey_internal() {
451        let num_vote_accounts = 10;
452        let staked_vote_accounts = 5;
453        let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(solana_pubkey::new_rand)
454            .take(num_vote_accounts)
455            .collect();
456        let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
457            .iter()
458            .skip(num_vote_accounts - staked_vote_accounts)
459            .map(|pubkey| (*pubkey, (1, VoteAccount::new_random())))
460            .collect();
461
462        let mut stats = PropagatedStats::default();
463        let mut node_pubkey = solana_pubkey::new_rand();
464
465        // Add a vote pubkey, the number of references in all_pubkeys
466        // should be 2
467        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
468        assert!(stats.propagated_node_ids.contains(&node_pubkey));
469        assert_eq!(
470            stats.propagated_validators_stake,
471            staked_vote_accounts as u64
472        );
473
474        // Adding it again should not change any state
475        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
476        assert!(stats.propagated_node_ids.contains(&node_pubkey));
477        assert_eq!(
478            stats.propagated_validators_stake,
479            staked_vote_accounts as u64
480        );
481
482        // Adding another pubkey with same vote accounts should succeed, but stake
483        // shouldn't increase
484        node_pubkey = solana_pubkey::new_rand();
485        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
486        assert!(stats.propagated_node_ids.contains(&node_pubkey));
487        assert_eq!(
488            stats.propagated_validators_stake,
489            staked_vote_accounts as u64
490        );
491
492        // Adding another pubkey with different vote accounts should succeed
493        // and increase stake
494        node_pubkey = solana_pubkey::new_rand();
495        let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(solana_pubkey::new_rand)
496            .take(num_vote_accounts)
497            .collect();
498        let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
499            .iter()
500            .skip(num_vote_accounts - staked_vote_accounts)
501            .map(|pubkey| (*pubkey, (1, VoteAccount::new_random())))
502            .collect();
503        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
504        assert!(stats.propagated_node_ids.contains(&node_pubkey));
505        assert_eq!(
506            stats.propagated_validators_stake,
507            2 * staked_vote_accounts as u64
508        );
509    }
510
511    #[test]
512    fn test_is_propagated_status_on_construction() {
513        // If the given ValidatorStakeInfo == None, then this is not
514        // a leader slot and is_propagated == false
515        let progress = ForkProgress::new(Hash::default(), Some(9), None, 0, 0);
516        assert!(!progress.propagated_stats.is_propagated);
517
518        // If the stake is zero, then threshold is always achieved
519        let progress = ForkProgress::new(
520            Hash::default(),
521            Some(9),
522            Some(ValidatorStakeInfo {
523                total_epoch_stake: 0,
524                ..ValidatorStakeInfo::default()
525            }),
526            0,
527            0,
528        );
529        assert!(progress.propagated_stats.is_propagated);
530
531        // If the stake is non zero, then threshold is not achieved unless
532        // validator has enough stake by itself to pass threshold
533        let progress = ForkProgress::new(
534            Hash::default(),
535            Some(9),
536            Some(ValidatorStakeInfo {
537                total_epoch_stake: 2,
538                ..ValidatorStakeInfo::default()
539            }),
540            0,
541            0,
542        );
543        assert!(!progress.propagated_stats.is_propagated);
544
545        // Give the validator enough stake by itself to pass threshold
546        let progress = ForkProgress::new(
547            Hash::default(),
548            Some(9),
549            Some(ValidatorStakeInfo {
550                stake: 1,
551                total_epoch_stake: 2,
552                ..ValidatorStakeInfo::default()
553            }),
554            0,
555            0,
556        );
557        assert!(progress.propagated_stats.is_propagated);
558
559        // Check that the default ValidatorStakeInfo::default() constructs a ForkProgress
560        // with is_propagated == false, otherwise propagation tests will fail to run
561        // the proper checks (most will auto-pass without checking anything)
562        let progress = ForkProgress::new(
563            Hash::default(),
564            Some(9),
565            Some(ValidatorStakeInfo::default()),
566            0,
567            0,
568        );
569        assert!(!progress.propagated_stats.is_propagated);
570    }
571
572    #[test]
573    fn test_is_propagated() {
574        let mut progress_map = ProgressMap::default();
575
576        // Insert new ForkProgress for slot 10 (not a leader slot) and its
577        // previous leader slot 9 (leader slot)
578        progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None, 0, 0));
579        progress_map.insert(
580            9,
581            ForkProgress::new(
582                Hash::default(),
583                None,
584                Some(ValidatorStakeInfo::default()),
585                0,
586                0,
587            ),
588        );
589
590        // None of these slot have parents which are confirmed
591        assert!(!progress_map.get_leader_propagation_slot_must_exist(9).0);
592        assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
593
594        // Insert new ForkProgress for slot 8 with no previous leader.
595        // The previous leader before 8, slot 7, does not exist in
596        // progress map, so is_propagated(8) should return true as
597        // this implies the parent is rooted
598        progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0));
599        assert!(progress_map.get_leader_propagation_slot_must_exist(8).0);
600
601        // If we set the is_propagated = true, is_propagated should return true
602        progress_map
603            .get_propagated_stats_mut(9)
604            .unwrap()
605            .is_propagated = true;
606        assert!(progress_map.get_leader_propagation_slot_must_exist(9).0);
607        assert!(progress_map.get(&9).unwrap().propagated_stats.is_propagated);
608
609        // Because slot 9 is now confirmed, then slot 10 is also confirmed b/c 9
610        // is the last leader slot before 10
611        assert!(progress_map.get_leader_propagation_slot_must_exist(10).0);
612
613        // If we make slot 10 a leader slot though, even though its previous
614        // leader slot 9 has been confirmed, slot 10 itself is not confirmed
615        progress_map
616            .get_propagated_stats_mut(10)
617            .unwrap()
618            .is_leader_slot = true;
619        assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
620    }
621}