solana_core/consensus/
progress_map.rs

1use {
2    crate::{
3        cluster_info_vote_listener::SlotVoteTracker,
4        cluster_slots_service::slot_supporters::SlotSupporters,
5        consensus::{Stake, ThresholdDecision, VotedStakes},
6        replay_stage::SUPERMINORITY_THRESHOLD,
7    },
8    solana_clock::Slot,
9    solana_hash::Hash,
10    solana_ledger::blockstore_processor::{ConfirmationProgress, ReplaySlotStats},
11    solana_pubkey::Pubkey,
12    solana_runtime::{bank::Bank, bank_forks::BankForks},
13    solana_vote::vote_account::VoteAccountsHashMap,
14    std::{
15        collections::{HashMap, HashSet},
16        sync::{Arc, RwLock},
17        time::Instant,
18    },
19};
20
21type VotedSlot = Slot;
22type ExpirationSlot = Slot;
23
24#[derive(Clone, Copy, Debug)]
25pub struct LockoutInterval {
26    pub voter: Pubkey,
27    pub start: VotedSlot,
28    pub end: ExpirationSlot,
29}
30
31pub type LockoutIntervals = Vec<LockoutInterval>;
32
33#[derive(Debug)]
34pub struct ValidatorStakeInfo {
35    pub validator_vote_pubkey: Pubkey,
36    pub stake: u64,
37    pub total_epoch_stake: u64,
38}
39
40impl Default for ValidatorStakeInfo {
41    fn default() -> Self {
42        Self {
43            stake: 0,
44            validator_vote_pubkey: Pubkey::default(),
45            total_epoch_stake: 1,
46        }
47    }
48}
49
50impl ValidatorStakeInfo {
51    pub fn new(validator_vote_pubkey: Pubkey, stake: u64, total_epoch_stake: u64) -> Self {
52        Self {
53            validator_vote_pubkey,
54            stake,
55            total_epoch_stake,
56        }
57    }
58}
59
60pub const RETRANSMIT_BASE_DELAY_MS: u64 = 5_000;
61pub const RETRANSMIT_BACKOFF_CAP: u32 = 6;
62
63#[derive(Debug)]
64pub struct RetransmitInfo {
65    pub(crate) retry_time: Instant,
66    pub(crate) retry_iteration: u32,
67}
68
69impl RetransmitInfo {
70    pub fn reached_retransmit_threshold(&self) -> bool {
71        let backoff = std::cmp::min(self.retry_iteration, RETRANSMIT_BACKOFF_CAP);
72        let backoff_duration_ms = (1_u64 << backoff) * RETRANSMIT_BASE_DELAY_MS;
73        self.retry_time.elapsed().as_millis() > u128::from(backoff_duration_ms)
74    }
75
76    pub fn increment_retry_iteration(&mut self) {
77        self.retry_iteration = self.retry_iteration.saturating_add(1);
78        self.retry_time = Instant::now();
79    }
80}
81
82pub struct ForkProgress {
83    pub is_dead: bool,
84    pub fork_stats: ForkStats,
85    pub propagated_stats: PropagatedStats,
86    pub replay_stats: Arc<RwLock<ReplaySlotStats>>,
87    pub replay_progress: Arc<RwLock<ConfirmationProgress>>,
88    pub retransmit_info: RetransmitInfo,
89    // Note `num_blocks_on_fork` and `num_dropped_blocks_on_fork` only
90    // count new blocks replayed since last restart, which won't include
91    // blocks already existing in the ledger/before snapshot at start,
92    // so these stats do not span all of time
93    pub num_blocks_on_fork: u64,
94    pub num_dropped_blocks_on_fork: u64,
95}
96
97impl ForkProgress {
98    pub fn new(
99        last_entry: Hash,
100        prev_leader_slot: Option<Slot>,
101        validator_stake_info: Option<ValidatorStakeInfo>,
102        num_blocks_on_fork: u64,
103        num_dropped_blocks_on_fork: u64,
104    ) -> Self {
105        let (
106            is_leader_slot,
107            propagated_validators_stake,
108            propagated_validators,
109            is_propagated,
110            total_epoch_stake,
111        ) = validator_stake_info
112            .map(|info| {
113                (
114                    true,
115                    info.stake,
116                    vec![info.validator_vote_pubkey].into_iter().collect(),
117                    {
118                        if info.total_epoch_stake == 0 {
119                            true
120                        } else {
121                            info.stake as f64 / info.total_epoch_stake as f64
122                                > SUPERMINORITY_THRESHOLD
123                        }
124                    },
125                    info.total_epoch_stake,
126                )
127            })
128            .unwrap_or((false, 0, HashSet::new(), false, 0));
129
130        Self {
131            is_dead: false,
132            fork_stats: ForkStats::default(),
133            replay_stats: Arc::new(RwLock::new(ReplaySlotStats::default())),
134            replay_progress: Arc::new(RwLock::new(ConfirmationProgress::new(last_entry))),
135            num_blocks_on_fork,
136            num_dropped_blocks_on_fork,
137            propagated_stats: PropagatedStats {
138                propagated_validators,
139                propagated_validators_stake,
140                is_propagated,
141                is_leader_slot,
142                prev_leader_slot,
143                total_epoch_stake,
144                ..PropagatedStats::default()
145            },
146            retransmit_info: RetransmitInfo {
147                retry_time: Instant::now(),
148                retry_iteration: 0u32,
149            },
150        }
151    }
152
153    pub fn new_from_bank(
154        bank: &Bank,
155        validator_identity: &Pubkey,
156        validator_vote_pubkey: &Pubkey,
157        prev_leader_slot: Option<Slot>,
158        num_blocks_on_fork: u64,
159        num_dropped_blocks_on_fork: u64,
160    ) -> Self {
161        let validator_stake_info = {
162            if bank.collector_id() == validator_identity {
163                Some(ValidatorStakeInfo::new(
164                    *validator_vote_pubkey,
165                    bank.epoch_vote_account_stake(validator_vote_pubkey),
166                    bank.total_epoch_stake(),
167                ))
168            } else {
169                None
170            }
171        };
172
173        let mut new_progress = Self::new(
174            bank.last_blockhash(),
175            prev_leader_slot,
176            validator_stake_info,
177            num_blocks_on_fork,
178            num_dropped_blocks_on_fork,
179        );
180
181        if bank.is_frozen() {
182            new_progress.fork_stats.bank_hash = Some(bank.hash());
183        }
184        new_progress
185    }
186}
187
188#[derive(Debug, Clone, Default)]
189pub struct ForkStats {
190    pub fork_stake: Stake,
191    pub total_stake: Stake,
192    pub block_height: u64,
193    pub has_voted: bool,
194    pub is_recent: bool,
195    pub is_empty: bool,
196    pub vote_threshold: Vec<ThresholdDecision>,
197    pub is_locked_out: bool,
198    pub voted_stakes: VotedStakes,
199    pub duplicate_confirmed_hash: Option<Hash>,
200    pub computed: bool,
201    pub lockout_intervals: LockoutIntervals,
202    pub bank_hash: Option<Hash>,
203    pub my_latest_landed_vote: Option<Slot>,
204}
205
206impl ForkStats {
207    /// Return fork_weight, i.e. bank_stake over total_stake.
208    pub fn fork_weight(&self) -> f64 {
209        self.fork_stake as f64 / self.total_stake as f64
210    }
211}
212
213#[derive(Clone, Default)]
214pub struct PropagatedStats {
215    pub propagated_validators: HashSet<Pubkey>,
216    pub propagated_node_ids: HashSet<Pubkey>,
217    pub propagated_validators_stake: u64,
218    pub is_propagated: bool,
219    pub is_leader_slot: bool,
220    pub prev_leader_slot: Option<Slot>,
221    pub slot_vote_tracker: Option<Arc<RwLock<SlotVoteTracker>>>,
222    pub cluster_slot_pubkeys: Option<Arc<SlotSupporters>>,
223    pub total_epoch_stake: u64,
224}
225
226impl PropagatedStats {
227    pub fn add_vote_pubkey(&mut self, vote_pubkey: Pubkey, stake: u64) {
228        if self.propagated_validators.insert(vote_pubkey) {
229            self.propagated_validators_stake += stake;
230        }
231    }
232
233    pub fn add_node_pubkey(&mut self, node_pubkey: &Pubkey, bank: &Bank) {
234        if !self.propagated_node_ids.contains(node_pubkey) {
235            let node_vote_accounts = bank
236                .epoch_vote_accounts_for_node_id(node_pubkey)
237                .map(|v| &v.vote_accounts);
238
239            if let Some(node_vote_accounts) = node_vote_accounts {
240                self.add_node_pubkey_internal(
241                    node_pubkey,
242                    node_vote_accounts,
243                    bank.epoch_vote_accounts(bank.epoch())
244                        .expect("Epoch stakes for bank's own epoch must exist"),
245                );
246            }
247        }
248    }
249
250    fn add_node_pubkey_internal(
251        &mut self,
252        node_pubkey: &Pubkey,
253        vote_account_pubkeys: &[Pubkey],
254        epoch_vote_accounts: &VoteAccountsHashMap,
255    ) {
256        self.propagated_node_ids.insert(*node_pubkey);
257        for vote_account_pubkey in vote_account_pubkeys.iter() {
258            let stake = epoch_vote_accounts
259                .get(vote_account_pubkey)
260                .map(|(stake, _)| *stake)
261                .unwrap_or(0);
262            self.add_vote_pubkey(*vote_account_pubkey, stake);
263        }
264    }
265}
266
267#[derive(Default)]
268pub struct ProgressMap {
269    progress_map: HashMap<Slot, ForkProgress>,
270}
271
272impl std::ops::Deref for ProgressMap {
273    type Target = HashMap<Slot, ForkProgress>;
274    fn deref(&self) -> &Self::Target {
275        &self.progress_map
276    }
277}
278
279impl std::ops::DerefMut for ProgressMap {
280    fn deref_mut(&mut self) -> &mut Self::Target {
281        &mut self.progress_map
282    }
283}
284
285impl ProgressMap {
286    pub fn insert(&mut self, slot: Slot, fork_progress: ForkProgress) {
287        self.progress_map.insert(slot, fork_progress);
288    }
289
290    pub fn get_propagated_stats(&self, slot: Slot) -> Option<&PropagatedStats> {
291        self.progress_map
292            .get(&slot)
293            .map(|fork_progress| &fork_progress.propagated_stats)
294    }
295
296    pub fn get_propagated_stats_mut(&mut self, slot: Slot) -> Option<&mut PropagatedStats> {
297        self.progress_map
298            .get_mut(&slot)
299            .map(|fork_progress| &mut fork_progress.propagated_stats)
300    }
301
302    pub fn get_propagated_stats_must_exist(&self, slot: Slot) -> &PropagatedStats {
303        self.get_propagated_stats(slot)
304            .unwrap_or_else(|| panic!("slot={slot} must exist in ProgressMap"))
305    }
306
307    pub fn get_fork_stats(&self, slot: Slot) -> Option<&ForkStats> {
308        self.progress_map
309            .get(&slot)
310            .map(|fork_progress| &fork_progress.fork_stats)
311    }
312
313    pub fn get_fork_stats_mut(&mut self, slot: Slot) -> Option<&mut ForkStats> {
314        self.progress_map
315            .get_mut(&slot)
316            .map(|fork_progress| &mut fork_progress.fork_stats)
317    }
318
319    pub fn get_retransmit_info(&self, slot: Slot) -> Option<&RetransmitInfo> {
320        self.progress_map
321            .get(&slot)
322            .map(|fork_progress| &fork_progress.retransmit_info)
323    }
324
325    pub fn get_retransmit_info_mut(&mut self, slot: Slot) -> Option<&mut RetransmitInfo> {
326        self.progress_map
327            .get_mut(&slot)
328            .map(|fork_progress| &mut fork_progress.retransmit_info)
329    }
330
331    pub fn is_dead(&self, slot: Slot) -> Option<bool> {
332        self.progress_map
333            .get(&slot)
334            .map(|fork_progress| fork_progress.is_dead)
335    }
336
337    pub fn get_hash(&self, slot: Slot) -> Option<Hash> {
338        self.progress_map
339            .get(&slot)
340            .and_then(|fork_progress| fork_progress.fork_stats.bank_hash)
341    }
342
343    pub fn is_propagated(&self, slot: Slot) -> Option<bool> {
344        self.get_propagated_stats(slot)
345            .map(|stats| stats.is_propagated)
346    }
347
348    pub fn get_latest_leader_slot_must_exist(&self, slot: Slot) -> Option<Slot> {
349        let propagated_stats = self.get_propagated_stats_must_exist(slot);
350        if propagated_stats.is_leader_slot {
351            Some(slot)
352        } else {
353            propagated_stats.prev_leader_slot
354        }
355    }
356
357    pub fn get_leader_propagation_slot_must_exist(&self, slot: Slot) -> (bool, Option<Slot>) {
358        if let Some(leader_slot) = self.get_latest_leader_slot_must_exist(slot) {
359            // If the leader's stats are None (isn't in the
360            // progress map), this means that prev_leader slot is
361            // rooted, so return true
362            (
363                self.is_propagated(leader_slot).unwrap_or(true),
364                Some(leader_slot),
365            )
366        } else {
367            // prev_leader_slot doesn't exist because already rooted
368            // or this validator hasn't been scheduled as a leader
369            // yet. In both cases the latest leader is vacuously
370            // confirmed
371            (true, None)
372        }
373    }
374
375    pub fn my_latest_landed_vote(&self, slot: Slot) -> Option<Slot> {
376        self.progress_map
377            .get(&slot)
378            .and_then(|s| s.fork_stats.my_latest_landed_vote)
379    }
380
381    pub fn set_duplicate_confirmed_hash(&mut self, slot: Slot, hash: Hash) {
382        let slot_progress = self.get_mut(&slot).unwrap();
383        slot_progress.fork_stats.duplicate_confirmed_hash = Some(hash);
384    }
385
386    pub fn is_duplicate_confirmed(&self, slot: Slot) -> Option<bool> {
387        self.progress_map
388            .get(&slot)
389            .map(|s| s.fork_stats.duplicate_confirmed_hash.is_some())
390    }
391
392    pub fn get_bank_prev_leader_slot(&self, bank: &Bank) -> Option<Slot> {
393        let parent_slot = bank.parent_slot();
394        self.get_propagated_stats(parent_slot)
395            .map(|stats| {
396                if stats.is_leader_slot {
397                    Some(parent_slot)
398                } else {
399                    stats.prev_leader_slot
400                }
401            })
402            .unwrap_or(None)
403    }
404
405    pub fn handle_new_root(&mut self, bank_forks: &BankForks) {
406        self.progress_map
407            .retain(|k, _| bank_forks.get(*k).is_some());
408    }
409
410    pub fn log_propagated_stats(&self, slot: Slot, bank_forks: &RwLock<BankForks>) {
411        if let Some(stats) = self.get_propagated_stats(slot) {
412            info!(
413                "Propagated stats: total staked: {}, observed staked: {}, vote pubkeys: {:?}, \
414                 node_pubkeys: {:?}, slot: {slot}, epoch: {:?}",
415                stats.total_epoch_stake,
416                stats.propagated_validators_stake,
417                stats.propagated_validators,
418                stats.propagated_node_ids,
419                bank_forks.read().unwrap().get(slot).map(|x| x.epoch()),
420            );
421        }
422    }
423}
424
425#[cfg(test)]
426mod test {
427    use {super::*, solana_vote::vote_account::VoteAccount};
428
429    #[test]
430    fn test_add_vote_pubkey() {
431        let mut stats = PropagatedStats::default();
432        let mut vote_pubkey = solana_pubkey::new_rand();
433
434        // Add a vote pubkey, the number of references in all_pubkeys
435        // should be 2
436        stats.add_vote_pubkey(vote_pubkey, 1);
437        assert!(stats.propagated_validators.contains(&vote_pubkey));
438        assert_eq!(stats.propagated_validators_stake, 1);
439
440        // Adding it again should change no state since the key already existed
441        stats.add_vote_pubkey(vote_pubkey, 1);
442        assert!(stats.propagated_validators.contains(&vote_pubkey));
443        assert_eq!(stats.propagated_validators_stake, 1);
444
445        // Adding another pubkey should succeed
446        vote_pubkey = solana_pubkey::new_rand();
447        stats.add_vote_pubkey(vote_pubkey, 2);
448        assert!(stats.propagated_validators.contains(&vote_pubkey));
449        assert_eq!(stats.propagated_validators_stake, 3);
450    }
451
452    #[test]
453    fn test_add_node_pubkey_internal() {
454        let num_vote_accounts = 10;
455        let staked_vote_accounts = 5;
456        let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(solana_pubkey::new_rand)
457            .take(num_vote_accounts)
458            .collect();
459        let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
460            .iter()
461            .skip(num_vote_accounts - staked_vote_accounts)
462            .map(|pubkey| (*pubkey, (1, VoteAccount::new_random())))
463            .collect();
464
465        let mut stats = PropagatedStats::default();
466        let mut node_pubkey = solana_pubkey::new_rand();
467
468        // Add a vote pubkey, the number of references in all_pubkeys
469        // should be 2
470        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
471        assert!(stats.propagated_node_ids.contains(&node_pubkey));
472        assert_eq!(
473            stats.propagated_validators_stake,
474            staked_vote_accounts as u64
475        );
476
477        // Adding it again should not change any state
478        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
479        assert!(stats.propagated_node_ids.contains(&node_pubkey));
480        assert_eq!(
481            stats.propagated_validators_stake,
482            staked_vote_accounts as u64
483        );
484
485        // Adding another pubkey with same vote accounts should succeed, but stake
486        // shouldn't increase
487        node_pubkey = solana_pubkey::new_rand();
488        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
489        assert!(stats.propagated_node_ids.contains(&node_pubkey));
490        assert_eq!(
491            stats.propagated_validators_stake,
492            staked_vote_accounts as u64
493        );
494
495        // Adding another pubkey with different vote accounts should succeed
496        // and increase stake
497        node_pubkey = solana_pubkey::new_rand();
498        let vote_account_pubkeys: Vec<_> = std::iter::repeat_with(solana_pubkey::new_rand)
499            .take(num_vote_accounts)
500            .collect();
501        let epoch_vote_accounts: HashMap<_, _> = vote_account_pubkeys
502            .iter()
503            .skip(num_vote_accounts - staked_vote_accounts)
504            .map(|pubkey| (*pubkey, (1, VoteAccount::new_random())))
505            .collect();
506        stats.add_node_pubkey_internal(&node_pubkey, &vote_account_pubkeys, &epoch_vote_accounts);
507        assert!(stats.propagated_node_ids.contains(&node_pubkey));
508        assert_eq!(
509            stats.propagated_validators_stake,
510            2 * staked_vote_accounts as u64
511        );
512    }
513
514    #[test]
515    fn test_is_propagated_status_on_construction() {
516        // If the given ValidatorStakeInfo == None, then this is not
517        // a leader slot and is_propagated == false
518        let progress = ForkProgress::new(Hash::default(), Some(9), None, 0, 0);
519        assert!(!progress.propagated_stats.is_propagated);
520
521        // If the stake is zero, then threshold is always achieved
522        let progress = ForkProgress::new(
523            Hash::default(),
524            Some(9),
525            Some(ValidatorStakeInfo {
526                total_epoch_stake: 0,
527                ..ValidatorStakeInfo::default()
528            }),
529            0,
530            0,
531        );
532        assert!(progress.propagated_stats.is_propagated);
533
534        // If the stake is non zero, then threshold is not achieved unless
535        // validator has enough stake by itself to pass threshold
536        let progress = ForkProgress::new(
537            Hash::default(),
538            Some(9),
539            Some(ValidatorStakeInfo {
540                total_epoch_stake: 2,
541                ..ValidatorStakeInfo::default()
542            }),
543            0,
544            0,
545        );
546        assert!(!progress.propagated_stats.is_propagated);
547
548        // Give the validator enough stake by itself to pass threshold
549        let progress = ForkProgress::new(
550            Hash::default(),
551            Some(9),
552            Some(ValidatorStakeInfo {
553                stake: 1,
554                total_epoch_stake: 2,
555                ..ValidatorStakeInfo::default()
556            }),
557            0,
558            0,
559        );
560        assert!(progress.propagated_stats.is_propagated);
561
562        // Check that the default ValidatorStakeInfo::default() constructs a ForkProgress
563        // with is_propagated == false, otherwise propagation tests will fail to run
564        // the proper checks (most will auto-pass without checking anything)
565        let progress = ForkProgress::new(
566            Hash::default(),
567            Some(9),
568            Some(ValidatorStakeInfo::default()),
569            0,
570            0,
571        );
572        assert!(!progress.propagated_stats.is_propagated);
573    }
574
575    #[test]
576    fn test_is_propagated() {
577        let mut progress_map = ProgressMap::default();
578
579        // Insert new ForkProgress for slot 10 (not a leader slot) and its
580        // previous leader slot 9 (leader slot)
581        progress_map.insert(10, ForkProgress::new(Hash::default(), Some(9), None, 0, 0));
582        progress_map.insert(
583            9,
584            ForkProgress::new(
585                Hash::default(),
586                None,
587                Some(ValidatorStakeInfo::default()),
588                0,
589                0,
590            ),
591        );
592
593        // None of these slot have parents which are confirmed
594        assert!(!progress_map.get_leader_propagation_slot_must_exist(9).0);
595        assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
596
597        // Insert new ForkProgress for slot 8 with no previous leader.
598        // The previous leader before 8, slot 7, does not exist in
599        // progress map, so is_propagated(8) should return true as
600        // this implies the parent is rooted
601        progress_map.insert(8, ForkProgress::new(Hash::default(), Some(7), None, 0, 0));
602        assert!(progress_map.get_leader_propagation_slot_must_exist(8).0);
603
604        // If we set the is_propagated = true, is_propagated should return true
605        progress_map
606            .get_propagated_stats_mut(9)
607            .unwrap()
608            .is_propagated = true;
609        assert!(progress_map.get_leader_propagation_slot_must_exist(9).0);
610        assert!(progress_map.get(&9).unwrap().propagated_stats.is_propagated);
611
612        // Because slot 9 is now confirmed, then slot 10 is also confirmed b/c 9
613        // is the last leader slot before 10
614        assert!(progress_map.get_leader_propagation_slot_must_exist(10).0);
615
616        // If we make slot 10 a leader slot though, even though its previous
617        // leader slot 9 has been confirmed, slot 10 itself is not confirmed
618        progress_map
619            .get_propagated_stats_mut(10)
620            .unwrap()
621            .is_leader_slot = true;
622        assert!(!progress_map.get_leader_propagation_slot_must_exist(10).0);
623    }
624}