solana_core/cluster_slots_service/
cluster_slots.rs

1/// ClusterSlots object holds information about which validators have confirmed which slots
2/// via EpochSlots mechanism. Periodically, EpochSlots get sent into here via update method.
3/// The ClusterSlots datastructure maintains a shadow copy of the stake info for current and
4/// upcoming epochs, and a ringbuffer of per-slot records. The ringbuffer may contain blank
5/// records if stake information is not available (can happen for very short epochs).
6use {
7    crate::{cluster_slots_service::slot_supporters::SlotSupporters, consensus::Stake},
8    solana_clock::{Epoch, Slot},
9    solana_epoch_schedule::EpochSchedule,
10    solana_gossip::{
11        cluster_info::ClusterInfo, contact_info::ContactInfo, crds::Cursor, epoch_slots::EpochSlots,
12    },
13    solana_pubkey::Pubkey,
14    solana_runtime::{bank::Bank, epoch_stakes::VersionedEpochStakes},
15    solana_time_utils::AtomicInterval,
16    std::{
17        collections::{HashMap, VecDeque},
18        hash::RandomState,
19        ops::Range,
20        sync::{
21            atomic::{AtomicU64, Ordering},
22            Arc, Mutex, RwLock,
23        },
24    },
25};
26
27// Limit the size of cluster-slots map in case
28// of receiving bogus epoch slots values.
29// This also constraints the size of the datastructure
30// if we are really really far behind.
31const CLUSTER_SLOTS_TRIM_SIZE: usize = 50000;
32
33//This is intended to be switched to solana_pubkey::PubkeyHasherBuilder
34type PubkeyHasherBuilder = RandomState;
35pub(crate) type ValidatorStakesMap = HashMap<Pubkey, Stake, PubkeyHasherBuilder>;
36
37/// Static snapshot of the information about a given epoch's stake distribution.
38struct EpochStakeInfo {
39    validator_stakes: Arc<ValidatorStakesMap>,
40    pubkey_to_index: Arc<HashMap<Pubkey, usize>>,
41    /// total amount of stake across all validators in `validator_stakes`.
42    total_stake: Stake,
43}
44
45impl From<&VersionedEpochStakes> for EpochStakeInfo {
46    fn from(stakes: &VersionedEpochStakes) -> Self {
47        let validator_stakes = ValidatorStakesMap::from_iter(
48            stakes
49                .node_id_to_vote_accounts()
50                .iter()
51                .map(|(k, v)| (*k, v.total_stake)),
52        );
53        Self::new(validator_stakes, stakes.total_stake())
54    }
55}
56
57impl EpochStakeInfo {
58    fn new(validator_stakes: HashMap<Pubkey, Stake>, total_stake: Stake) -> Self {
59        let pubkey_to_index: HashMap<Pubkey, usize, PubkeyHasherBuilder> = validator_stakes
60            .keys()
61            .enumerate()
62            .map(|(v, &k)| (k, v))
63            .collect();
64        EpochStakeInfo {
65            validator_stakes: Arc::new(validator_stakes),
66            pubkey_to_index: Arc::new(pubkey_to_index),
67            total_stake,
68        }
69    }
70}
71
72/// Holds schedule of the epoch where root bank currently sits
73struct RootEpoch {
74    number: Epoch,
75    schedule: EpochSchedule,
76}
77#[derive(Default)]
78pub struct ClusterSlots {
79    // ring buffer storing, per slot, which stakes were committed to a certain slot.
80    cluster_slots: RwLock<VecDeque<RowContent>>,
81    // a cache of validator stakes for reuse internally, updated at epoch boundary.
82    epoch_metadata: RwLock<HashMap<Epoch, EpochStakeInfo>>,
83    current_slot: AtomicU64, // current slot at the front of ringbuffer.
84    root_epoch: RwLock<Option<RootEpoch>>, // epoch where root bank is
85    cursor: Mutex<Cursor>,   // cursor to read CRDS.
86    metrics_last_report: AtomicInterval, // last time statistics were reported.
87    metric_allocations: AtomicU64, // total amount of memory allocations made.
88    metric_write_locks: AtomicU64, // total amount of write locks taken outside of initialization.
89}
90
91#[derive(Debug)]
92struct RowContent {
93    slot: Slot, // slot for which this row stores information
94    supporters: Arc<SlotSupporters>,
95}
96
97impl ClusterSlots {
98    #[inline]
99    pub(crate) fn lookup(&self, slot: Slot) -> Option<Arc<SlotSupporters>> {
100        let cluster_slots = self.cluster_slots.read().unwrap();
101
102        let row = Self::get_row_for_slot(slot, &cluster_slots)?;
103        if row.supporters.is_blank() {
104            None
105        } else {
106            Some(row.supporters.clone())
107        }
108    }
109
110    #[inline]
111    fn get_row_for_slot(slot: Slot, cluster_slots: &VecDeque<RowContent>) -> Option<&RowContent> {
112        let start = cluster_slots.front()?.slot;
113        if slot < start {
114            return None;
115        }
116        let idx = slot - start;
117        cluster_slots.get(idx as usize)
118    }
119
120    #[inline]
121    fn get_row_for_slot_mut(
122        slot: Slot,
123        cluster_slots: &mut VecDeque<RowContent>,
124    ) -> Option<&mut RowContent> {
125        let start = cluster_slots.front()?.slot;
126        if slot < start {
127            return None;
128        }
129        let idx = slot - start;
130        cluster_slots.get_mut(idx as usize)
131    }
132
133    pub(crate) fn update(&self, root_bank: &Bank, cluster_info: &ClusterInfo) {
134        let root_slot = root_bank.slot();
135        let current_slot = self.get_current_slot();
136        if current_slot > root_slot + 1 {
137            error!("Invalid update call to ClusterSlots, can not roll time backwards!");
138            return;
139        }
140        let root_epoch = root_bank.epoch();
141        if self.need_to_update_epoch(root_epoch) {
142            self.update_epoch_info(root_bank);
143        }
144
145        let epoch_slots = {
146            let mut cursor = self.cursor.lock().unwrap();
147            cluster_info.get_epoch_slots(&mut cursor)
148        };
149        self.update_internal(root_slot, epoch_slots);
150        self.maybe_report_cluster_slots_perf_stats();
151    }
152
153    fn need_to_update_epoch(&self, root_epoch: Epoch) -> bool {
154        let rg = self.root_epoch.read().unwrap();
155        let my_epoch = rg.as_ref().map(|v| v.number);
156        Some(root_epoch) != my_epoch
157    }
158
159    // call this to update internal datastructures for current and next epoch
160    fn update_epoch_info(&self, root_bank: &Bank) {
161        let root_epoch = root_bank.epoch();
162        info!("Updating epoch_metadata for epoch {root_epoch}");
163        let epoch_stakes_map = root_bank.epoch_stakes_map();
164
165        {
166            let mut epoch_metadata = self.epoch_metadata.write().unwrap();
167            // check if we need to do any cleanup in the epoch_metadata
168            {
169                if let Some(my_epoch) = self.get_epoch_for_slot(self.get_current_slot()) {
170                    info!("Evicting epoch_metadata for epoch {my_epoch}");
171                    epoch_metadata.remove(&my_epoch);
172                }
173            }
174            // Next we fetch info about current and upcoming epoch's stakes
175            epoch_metadata.insert(
176                root_epoch,
177                EpochStakeInfo::from(&epoch_stakes_map[&root_epoch]),
178            );
179        }
180        *self.root_epoch.write().unwrap() = Some(RootEpoch {
181            schedule: root_bank.epoch_schedule().clone(),
182            number: root_epoch,
183        });
184
185        let next_epoch = root_epoch.wrapping_add(1);
186        let next_epoch_info = EpochStakeInfo::from(&epoch_stakes_map[&next_epoch]);
187        let mut first_slot = root_bank
188            .epoch_schedule()
189            .get_first_slot_in_epoch(next_epoch);
190        let mut cluster_slots = self.cluster_slots.write().unwrap();
191        let mut patched = 0;
192        loop {
193            let row = Self::get_row_for_slot_mut(first_slot, &mut cluster_slots);
194            let Some(row) = row else {
195                break; // reached the end of ringbuffer and/or ringbuffer is not initialized
196            };
197            // rows for this epoch are not initialized, initialize them now
198            if row.supporters.is_blank() {
199                patched += 1;
200                row.supporters = Arc::new(SlotSupporters::new(
201                    next_epoch_info.total_stake,
202                    next_epoch_info.pubkey_to_index.clone(),
203                ));
204            } else {
205                // if any rows for this epoch are initialized, they all should be
206                break;
207            }
208            first_slot = first_slot.wrapping_add(1);
209        }
210        if patched > 0 {
211            warn!("Finalized init for {patched} slots in epoch {next_epoch}");
212        }
213        let mut epoch_metadata = self.epoch_metadata.write().unwrap();
214        epoch_metadata.insert(next_epoch, next_epoch_info);
215    }
216    #[cfg(test)]
217    pub(crate) fn fake_epoch_info_for_tests(&self, validator_stakes: ValidatorStakesMap) {
218        assert!(
219            self.root_epoch.read().unwrap().is_none(),
220            "Can not use fake epoch initialization more than once!"
221        );
222        let sched = EpochSchedule::without_warmup();
223        *self.root_epoch.write().unwrap() = Some(RootEpoch {
224            schedule: sched,
225            number: 0,
226        });
227        let total_stake = validator_stakes.values().sum();
228        let mut epoch_metadata = self.epoch_metadata.write().unwrap();
229        epoch_metadata.insert(0, EpochStakeInfo::new(validator_stakes, total_stake));
230    }
231
232    /// Advance the cluster_slots ringbuffer, initialize if needed.
233    /// We will discard slots at or before current root or too far ahead.
234    fn roll_cluster_slots(&self, root: Slot) -> Range<Slot> {
235        let slot_range = (root + 1)..root.saturating_add(CLUSTER_SLOTS_TRIM_SIZE as u64 + 1);
236        let current_slot = self.current_slot.load(Ordering::Relaxed);
237        // early-return if no slot change happened
238        if current_slot == slot_range.start {
239            return slot_range;
240        }
241        assert!(
242            slot_range.start > current_slot,
243            "Can not roll cluster slots backwards!"
244        );
245        let mut cluster_slots = self.cluster_slots.write().unwrap();
246        self.metric_write_locks.fetch_add(1, Ordering::Relaxed);
247        let epoch_metadata = self.epoch_metadata.read().unwrap();
248        //startup init, this is very slow but only ever happens once
249        if cluster_slots.is_empty() {
250            info!("Init cluster_slots at range {slot_range:?}");
251            for slot in slot_range.clone() {
252                // Epoch should be defined for all slots in the window
253                let epoch = self
254                    .get_epoch_for_slot(slot)
255                    .expect("Epoch should be defined for all slots");
256                let supporters = if let Some(epoch_data) = epoch_metadata.get(&epoch) {
257                    SlotSupporters::new(epoch_data.total_stake, epoch_data.pubkey_to_index.clone())
258                } else {
259                    // we should be able to initialize at least current epoch right away
260                    if epoch
261                        == self
262                            .get_epoch_for_slot(current_slot)
263                            .expect("Epochs should be defined")
264                    {
265                        panic!(
266                            "Epoch slots can not find stake info for slot {slot} in epoch {epoch}"
267                        );
268                    }
269                    SlotSupporters::new_blank()
270                };
271                self.metric_allocations.fetch_add(1, Ordering::Relaxed);
272                cluster_slots.push_back(RowContent {
273                    slot,
274                    supporters: Arc::new(supporters),
275                });
276            }
277        }
278        // discard and recycle outdated elements
279        loop {
280            let RowContent { slot, .. } = cluster_slots
281                .front()
282                .expect("After initialization the ring buffer can not be empty");
283            // stop once we reach a slot in the valid range
284            if *slot >= slot_range.start {
285                break;
286            }
287            // pop useless record from the front
288            let RowContent { supporters, .. } = cluster_slots.pop_front().unwrap();
289            // try to reuse its map allocation at the back of the datastructure
290            let slot = cluster_slots.back().unwrap().slot + 1;
291
292            let epoch = self
293                .get_epoch_for_slot(slot)
294                .expect("Epoch should be defined for all slots in the window");
295            let Some(stake_info) = epoch_metadata.get(&epoch) else {
296                warn!(
297                    "Epoch slots can not reuse slot entry for slot {slot} since stakes for epoch \
298                     {epoch} are not available"
299                );
300                cluster_slots.push_back(RowContent {
301                    slot,
302                    supporters: Arc::new(SlotSupporters::new_blank()),
303                });
304                break;
305            };
306
307            let new_supporters = match Arc::try_unwrap(supporters) {
308                Ok(supporters) => {
309                    supporters.recycle(stake_info.total_stake, &stake_info.pubkey_to_index)
310                }
311                // if we can not reuse just allocate a new one =(
312                Err(_) => {
313                    self.metric_allocations.fetch_add(1, Ordering::Relaxed);
314                    SlotSupporters::new(stake_info.total_stake, stake_info.pubkey_to_index.clone())
315                }
316            };
317            cluster_slots.push_back(RowContent {
318                slot,
319                supporters: Arc::new(new_supporters),
320            });
321        }
322        debug_assert!(
323            cluster_slots.len() == CLUSTER_SLOTS_TRIM_SIZE,
324            "Ring buffer should be exactly the intended size"
325        );
326        self.current_slot.store(slot_range.start, Ordering::Relaxed);
327        slot_range
328    }
329
330    fn update_internal(&self, root: Slot, epoch_slots_list: Vec<EpochSlots>) {
331        // Adjust the range of slots we can store in the datastructure to the
332        // current rooted slot, ensure the datastructure has the correct window in scope
333        let slot_range = self.roll_cluster_slots(root);
334
335        let epoch_metadata = self.epoch_metadata.read().unwrap();
336        let cluster_slots = self.cluster_slots.read().unwrap();
337        for epoch_slots in epoch_slots_list {
338            let Some(first_slot) = epoch_slots.first_slot() else {
339                continue;
340            };
341            let Some(epoch) = self.get_epoch_for_slot(first_slot) else {
342                continue;
343            };
344            let Some(epoch_meta) = epoch_metadata.get(&epoch) else {
345                continue;
346            };
347            //filter out unstaked nodes
348            let Some(&sender_stake) = epoch_meta.validator_stakes.get(&epoch_slots.from) else {
349                continue;
350            };
351            let updates = epoch_slots
352                .to_slots(root)
353                .filter(|slot| slot_range.contains(slot));
354            // figure out which entries would get updated by the new message and cache them
355            for slot in updates {
356                let RowContent {
357                    slot: s,
358                    supporters: map,
359                } = Self::get_row_for_slot(slot, &cluster_slots).unwrap();
360                debug_assert_eq!(*s, slot, "Fetched slot does not match expected value!");
361                if map.is_frozen() {
362                    continue;
363                }
364                if map
365                    .set_support_by_pubkey(&epoch_slots.from, sender_stake)
366                    .is_err()
367                {
368                    error!("Unexpected pubkey {} for slot {}!", &epoch_slots.from, slot);
369                    break;
370                }
371            }
372        }
373    }
374
375    /// Upload stats into metrics database
376    fn maybe_report_cluster_slots_perf_stats(&self) {
377        if self.metrics_last_report.should_update(10_000) {
378            let write_locks = self.metric_write_locks.swap(0, Ordering::Relaxed);
379            let allocations = self.metric_allocations.swap(0, Ordering::Relaxed);
380            let cluster_slots = self.cluster_slots.read().unwrap();
381            let (size, frozen, blank) =
382                cluster_slots
383                    .iter()
384                    .fold((0, 0, 0), |(s, f, b), RowContent { supporters, .. }| {
385                        (
386                            s + supporters.memory_usage(),
387                            f + supporters.is_frozen() as usize,
388                            b + supporters.is_blank() as usize,
389                        )
390                    });
391            datapoint_info!(
392                "cluster-slots-size",
393                ("total_entries", size as i64, i64),
394                ("frozen_entries", frozen as i64, i64),
395                ("blank_entries", blank as i64, i64),
396                ("write_locks", write_locks as i64, i64),
397                ("total_allocations", allocations as i64, i64),
398            );
399        }
400    }
401    fn get_current_slot(&self) -> Slot {
402        self.current_slot.load(Ordering::Relaxed)
403    }
404
405    fn with_root_epoch<T>(&self, closure: impl FnOnce(&RootEpoch) -> T) -> Option<T> {
406        let rg = self.root_epoch.read().unwrap();
407        rg.as_ref().map(closure)
408    }
409
410    fn get_epoch_for_slot(&self, slot: Slot) -> Option<u64> {
411        self.with_root_epoch(|b| b.schedule.get_epoch_and_slot_index(slot).0)
412    }
413
414    #[cfg(test)]
415    // patches the given node_id into the internal structures
416    // to pretend as if it has submitted epoch slots for a given slot.
417    // If the node was not previosly registered in validator_stakes,
418    // an override_stake amount should be provided.
419    pub(crate) fn insert_node_id(&self, slot: Slot, node_id: Pubkey) {
420        let mut epoch_slot = EpochSlots {
421            from: node_id,
422            ..Default::default()
423        };
424        epoch_slot.fill(&[slot], 0);
425        let current_root = self.current_slot.load(Ordering::Relaxed);
426        self.update_internal(current_root, vec![epoch_slot]);
427    }
428
429    pub(crate) fn compute_weights(&self, slot: Slot, repair_peers: &[ContactInfo]) -> Vec<u64> {
430        if repair_peers.is_empty() {
431            return vec![];
432        }
433        let stakes = {
434            let failsafe = std::iter::repeat_n(1, repair_peers.len());
435            let Some(epoch) = self.get_epoch_for_slot(slot) else {
436                error!("No epoch info for slot {slot}");
437                return Vec::from_iter(failsafe);
438            };
439            let epoch_metadata = self.epoch_metadata.read().unwrap();
440            let Some(stakeinfo) = epoch_metadata.get(&epoch) else {
441                error!("No epoch_metadata record for epoch {epoch}");
442                return Vec::from_iter(failsafe);
443            };
444            let validator_stakes = stakeinfo.validator_stakes.as_ref();
445            repair_peers
446                .iter()
447                .map(|peer| {
448                    validator_stakes
449                        .get(peer.pubkey())
450                        .cloned()
451                        .unwrap_or(1)
452                        .max(1)
453                })
454                .collect()
455        };
456        let Some(slot_peers) = self.lookup(slot) else {
457            return stakes;
458        };
459
460        repair_peers
461            .iter()
462            .map(|peer| slot_peers.get_support_by_pubkey(peer.pubkey()).unwrap_or(0))
463            .zip(stakes)
464            .map(|(a, b)| (a / 2 + b / 2).max(1u64))
465            .collect()
466    }
467
468    pub(crate) fn compute_weights_exclude_nonfrozen(
469        &self,
470        slot: Slot,
471        repair_peers: &[ContactInfo],
472    ) -> (Vec<u64>, Vec<usize>) {
473        let Some(slot_peers) = self.lookup(slot) else {
474            return (vec![], vec![]);
475        };
476        let mut weights = Vec::with_capacity(repair_peers.len());
477        let mut indices = Vec::with_capacity(repair_peers.len());
478
479        for (index, peer) in repair_peers.iter().enumerate() {
480            if let Some(stake) = slot_peers.get_support_by_pubkey(peer.pubkey()) {
481                if stake > 0 {
482                    weights.push(stake.max(1));
483                    indices.push(index);
484                }
485            }
486        }
487        (weights, indices)
488    }
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494
495    #[test]
496    fn test_default() {
497        let cs = ClusterSlots::default();
498        assert!(cs.cluster_slots.read().unwrap().is_empty());
499    }
500
501    #[test]
502    fn test_roll_cluster_slots() {
503        let cs = ClusterSlots::default();
504        let pk1 = Pubkey::new_unique();
505        let pk2 = Pubkey::new_unique();
506
507        let trimsize = CLUSTER_SLOTS_TRIM_SIZE as u64;
508        let validator_stakes = HashMap::from([(pk1, 10), (pk2, 20)]);
509        assert_eq!(
510            cs.cluster_slots.read().unwrap().len(),
511            0,
512            "ring should be initially empty"
513        );
514        cs.fake_epoch_info_for_tests(validator_stakes);
515        cs.roll_cluster_slots(0);
516        {
517            let rg = cs.cluster_slots.read().unwrap();
518            assert_eq!(
519                rg.len(),
520                CLUSTER_SLOTS_TRIM_SIZE,
521                "ring should have exactly {CLUSTER_SLOTS_TRIM_SIZE} elements"
522            );
523            assert_eq!(rg.front().unwrap().slot, 1, "first slot should be root + 1");
524            assert_eq!(
525                rg.back().unwrap().slot - rg.front().unwrap().slot,
526                trimsize - 1,
527                "ring should have the right size"
528            );
529        }
530        //step 1 slot
531        cs.roll_cluster_slots(1);
532        {
533            let rg = cs.cluster_slots.read().unwrap();
534            assert_eq!(rg.front().unwrap().slot, 2, "first slot should be root + 1");
535            assert_eq!(
536                rg.back().unwrap().slot - rg.front().unwrap().slot,
537                trimsize - 1,
538                "ring should have the right size"
539            );
540        }
541        let allocs = cs.metric_allocations.load(Ordering::Relaxed);
542        // make 1 full loop
543        cs.roll_cluster_slots(trimsize);
544        {
545            let rg = cs.cluster_slots.read().unwrap();
546            assert_eq!(
547                rg.front().unwrap().slot,
548                trimsize + 1,
549                "first slot should be root + 1"
550            );
551            let allocs = cs.metric_allocations.load(Ordering::Relaxed) - allocs;
552            assert_eq!(allocs, 0, "No need to allocate when rolling ringbuf");
553            assert_eq!(
554                rg.back().unwrap().slot - rg.front().unwrap().slot,
555                trimsize - 1,
556                "ring should have the right size"
557            );
558        }
559    }
560
561    fn fake_stakes() -> (Pubkey, Pubkey, ValidatorStakesMap) {
562        let pk1 = Pubkey::new_unique();
563        let pk2 = Pubkey::new_unique();
564
565        let validator_stakes = HashMap::from([(pk1, 10), (pk2, 20)]);
566        (pk1, pk2, validator_stakes)
567    }
568
569    #[test]
570    #[should_panic]
571    fn test_roll_cluster_slots_backwards() {
572        let cs = ClusterSlots::default();
573        let (_, _, validator_stakes) = fake_stakes();
574
575        cs.fake_epoch_info_for_tests(validator_stakes);
576        cs.roll_cluster_slots(10);
577        cs.roll_cluster_slots(5);
578    }
579
580    #[test]
581    fn test_update_empty() {
582        let cs = ClusterSlots::default();
583        let (pk1, _, validator_stakes) = fake_stakes();
584        cs.fake_epoch_info_for_tests(validator_stakes);
585        let epoch_slot = EpochSlots {
586            from: pk1,
587            ..Default::default()
588        };
589        cs.update_internal(0, vec![epoch_slot]);
590        assert!(cs.lookup(0).is_none());
591    }
592
593    #[test]
594    fn test_update_rooted() {
595        //root is 0, so it should be a noop
596        let cs = ClusterSlots::default();
597        let (pk1, _, validator_stakes) = fake_stakes();
598        cs.fake_epoch_info_for_tests(validator_stakes);
599        let mut epoch_slot = EpochSlots {
600            from: pk1,
601            ..Default::default()
602        };
603        epoch_slot.fill(&[0], 0);
604        cs.update_internal(0, vec![epoch_slot]);
605        assert!(cs.lookup(0).is_none());
606    }
607
608    #[test]
609    fn test_update_multiple_slots() {
610        let cs = ClusterSlots::default();
611        let (pk1, pk2, validator_stakes) = fake_stakes();
612        cs.fake_epoch_info_for_tests(validator_stakes);
613
614        let mut epoch_slot1 = EpochSlots {
615            from: pk1,
616            ..Default::default()
617        };
618        epoch_slot1.fill(&[2, 4, 5], 0);
619        let mut epoch_slot2 = EpochSlots {
620            from: pk2,
621            ..Default::default()
622        };
623        epoch_slot2.fill(&[1, 3, 5], 1);
624        cs.update_internal(0, vec![epoch_slot1, epoch_slot2]);
625        assert!(
626            cs.lookup(0).is_none(),
627            "slot 0 should not be supported by anyone"
628        );
629        assert!(cs.lookup(1).is_some(), "slot 1 should be supported");
630        assert_eq!(
631            cs.lookup(1).unwrap().get_support_by_pubkey(&pk2),
632            Some(20),
633            "support should come from validator 2"
634        );
635        assert_eq!(
636            cs.lookup(4).unwrap().get_support_by_pubkey(&pk1),
637            Some(10),
638            "validator 1 should support slot 4"
639        );
640        let map = cs.lookup(5).unwrap();
641        assert_eq!(
642            map.get_support_by_pubkey(&pk1),
643            Some(10),
644            "both should support slot 5"
645        );
646        assert_eq!(
647            map.get_support_by_pubkey(&pk2),
648            Some(20),
649            "both should support slot 5"
650        );
651    }
652
653    #[test]
654    fn test_compute_weights_failsafes() {
655        let cs = ClusterSlots::default();
656        let ci = ContactInfo::default();
657        assert_eq!(cs.compute_weights(0, &[ci]), vec![1]);
658
659        let (_, _, validator_stakes) = fake_stakes();
660        cs.fake_epoch_info_for_tests(validator_stakes);
661        let ci = ContactInfo::default();
662        assert_eq!(cs.compute_weights(0, &[ci]), vec![1]);
663    }
664
665    #[test]
666    fn test_best_peer_2() {
667        let cs = ClusterSlots::default();
668        let mut map = HashMap::default();
669        let pk1 = Pubkey::new_unique();
670        let pk2 = Pubkey::new_unique();
671        map.insert(pk1, 1000);
672        map.insert(pk2, 10);
673        map.insert(Pubkey::new_unique(), u64::MAX / 2);
674        cs.fake_epoch_info_for_tests(map);
675
676        let mut epoch_slot1 = EpochSlots {
677            from: pk1,
678            ..Default::default()
679        };
680        epoch_slot1.fill(&[1, 2, 3, 4], 0);
681        let mut epoch_slot2 = EpochSlots {
682            from: pk2,
683            ..Default::default()
684        };
685        epoch_slot2.fill(&[1, 2, 3, 4], 0);
686        // both peers have slot 1 confirmed
687        cs.update_internal(1, vec![epoch_slot1, epoch_slot2]);
688        let ci1 = ContactInfo::new(pk1, /*wallclock:*/ 0, /*shred_version:*/ 0);
689        let ci2 = ContactInfo::new(pk2, /*wallclock:*/ 0, /*shred_version:*/ 0);
690
691        assert_eq!(
692            cs.compute_weights(1, &[ci1, ci2]),
693            vec![1000, 10],
694            "weights should match the stakes"
695        );
696    }
697
698    #[test]
699    fn test_best_peer_3() {
700        solana_logger::setup_with_default("info");
701        let cs = ClusterSlots::default();
702        let pk1 = Pubkey::new_unique();
703        let pk2 = Pubkey::new_unique();
704        let pk_other = Pubkey::new_unique();
705        //set stakes of pk1 high and pk2 to unstaked
706        let validator_stakes: HashMap<_, _> =
707            [(pk1, 42), (pk_other, u64::MAX / 2)].into_iter().collect();
708        cs.fake_epoch_info_for_tests(validator_stakes);
709        let mut epoch_slot = EpochSlots {
710            from: pk_other,
711            ..Default::default()
712        };
713        epoch_slot.fill(&[1, 2, 3, 4], 0);
714        // neither pk1 or pk2 has any confirmed slots
715        cs.update_internal(0, vec![epoch_slot]);
716        let c1 = ContactInfo::new(pk1, /*wallclock:*/ 0, /*shred_version:*/ 0);
717        let c2 = ContactInfo::new(pk2, /*wallclock:*/ 0, /*shred_version:*/ 0);
718        assert_eq!(
719            cs.compute_weights(1, &[c1, c2]),
720            vec![42 / 2, 1],
721            "weights should be halved, but never zero"
722        );
723    }
724
725    #[test]
726    fn test_best_completed_slot_peer() {
727        let cs = ClusterSlots::default();
728        let contact_infos: Vec<_> = std::iter::repeat_with(|| {
729            ContactInfo::new(
730                Pubkey::new_unique(),
731                0, // wallclock
732                0, // shred_version
733            )
734        })
735        .take(2)
736        .collect();
737        let slot = 9;
738
739        // None of these validators have completed slot 9, so should
740        // return nothing
741        let (w, i) = cs.compute_weights_exclude_nonfrozen(slot, &contact_infos);
742        assert!(w.is_empty());
743        assert!(i.is_empty());
744
745        // Give second validator max stake
746        let validator_stakes: HashMap<_, _> = [
747            (*contact_infos[0].pubkey(), 42),
748            (*contact_infos[1].pubkey(), u64::MAX / 2),
749        ]
750        .into_iter()
751        .collect();
752        cs.fake_epoch_info_for_tests(validator_stakes);
753
754        // Mark the first validator as completed slot 9, should pick that validator,
755        // even though it only has minimal stake, while the other validator has
756        // max stake
757        cs.insert_node_id(slot, *contact_infos[0].pubkey());
758        let (w, i) = cs.compute_weights_exclude_nonfrozen(slot, &contact_infos);
759        assert_eq!(w, [42]);
760        assert_eq!(i, [0]);
761    }
762
763    #[test]
764    fn test_update_new_staked_slot() {
765        let cs = ClusterSlots::default();
766        let pk = Pubkey::new_unique();
767        let mut epoch_slot = EpochSlots {
768            from: pk,
769            ..Default::default()
770        };
771        epoch_slot.fill(&[1], 0);
772        let map = HashMap::from([(pk, 42)]);
773        cs.fake_epoch_info_for_tests(map);
774        cs.update_internal(0, vec![epoch_slot]);
775        assert!(cs.lookup(1).is_some(), "slot 1 should have records");
776        assert_eq!(
777            cs.lookup(1).unwrap().get_support_by_pubkey(&pk).unwrap(),
778            42,
779            "the stake of the node should be commited to the slot"
780        );
781    }
782}