solana-core 1.4.5

Blockchain, Rebuilt for Scale
use crate::{consensus::Stake, rpc_subscriptions::RpcSubscriptions};
use solana_measure::measure::Measure;
use solana_metrics::datapoint_info;
use solana_runtime::{
    bank::Bank,
    commitment::{BlockCommitment, BlockCommitmentCache, CommitmentSlots, VOTE_THRESHOLD_SIZE},
};
use solana_sdk::clock::Slot;
use solana_vote_program::vote_state::VoteState;
use std::{
    cmp::max,
    collections::HashMap,
    sync::atomic::{AtomicBool, Ordering},
    sync::mpsc::{channel, Receiver, RecvTimeoutError, Sender},
    sync::{Arc, RwLock},
    thread::{self, Builder, JoinHandle},
    time::Duration,
};

pub struct CommitmentAggregationData {
    bank: Arc<Bank>,
    root: Slot,
    total_stake: Stake,
}

impl CommitmentAggregationData {
    pub fn new(bank: Arc<Bank>, root: Slot, total_stake: Stake) -> Self {
        Self {
            bank,
            root,
            total_stake,
        }
    }
}

fn get_highest_confirmed_root(mut rooted_stake: Vec<(Slot, u64)>, total_stake: u64) -> Slot {
    rooted_stake.sort_by(|a, b| a.0.cmp(&b.0).reverse());
    let mut stake_sum = 0;
    for (root, stake) in rooted_stake {
        stake_sum += stake;
        if (stake_sum as f64 / total_stake as f64) > VOTE_THRESHOLD_SIZE {
            return root;
        }
    }
    0
}

pub struct AggregateCommitmentService {
    t_commitment: JoinHandle<()>,
}

impl AggregateCommitmentService {
    pub fn new(
        exit: &Arc<AtomicBool>,
        block_commitment_cache: Arc<RwLock<BlockCommitmentCache>>,
        subscriptions: Arc<RpcSubscriptions>,
    ) -> (Sender<CommitmentAggregationData>, Self) {
        let (sender, receiver): (
            Sender<CommitmentAggregationData>,
            Receiver<CommitmentAggregationData>,
        ) = channel();
        let exit_ = exit.clone();
        (
            sender,
            Self {
                t_commitment: Builder::new()
                    .name("solana-aggregate-stake-lockouts".to_string())
                    .spawn(move || loop {
                        if exit_.load(Ordering::Relaxed) {
                            break;
                        }

                        if let Err(RecvTimeoutError::Disconnected) =
                            Self::run(&receiver, &block_commitment_cache, &subscriptions, &exit_)
                        {
                            break;
                        }
                    })
                    .unwrap(),
            },
        )
    }

    fn run(
        receiver: &Receiver<CommitmentAggregationData>,
        block_commitment_cache: &RwLock<BlockCommitmentCache>,
        subscriptions: &Arc<RpcSubscriptions>,
        exit: &Arc<AtomicBool>,
    ) -> Result<(), RecvTimeoutError> {
        loop {
            if exit.load(Ordering::Relaxed) {
                return Ok(());
            }

            let mut aggregation_data = receiver.recv_timeout(Duration::from_secs(1))?;

            while let Ok(new_data) = receiver.try_recv() {
                aggregation_data = new_data;
            }

            let ancestors = aggregation_data.bank.status_cache_ancestors();
            if ancestors.is_empty() {
                continue;
            }

            let mut aggregate_commitment_time = Measure::start("aggregate-commitment-ms");
            let update_commitment_slots =
                Self::update_commitment_cache(block_commitment_cache, aggregation_data, ancestors);
            aggregate_commitment_time.stop();
            datapoint_info!(
                "block-commitment-cache",
                (
                    "aggregate-commitment-ms",
                    aggregate_commitment_time.as_ms() as i64,
                    i64
                )
            );

            // Triggers rpc_subscription notifications as soon as new commitment data is available,
            // sending just the commitment cache slot information that the notifications thread
            // needs
            subscriptions.notify_subscribers(update_commitment_slots);
        }
    }

    fn update_commitment_cache(
        block_commitment_cache: &RwLock<BlockCommitmentCache>,
        aggregation_data: CommitmentAggregationData,
        ancestors: Vec<u64>,
    ) -> CommitmentSlots {
        let (block_commitment, rooted_stake) =
            Self::aggregate_commitment(&ancestors, &aggregation_data.bank);

        let highest_confirmed_root =
            get_highest_confirmed_root(rooted_stake, aggregation_data.total_stake);

        let mut new_block_commitment = BlockCommitmentCache::new(
            block_commitment,
            aggregation_data.total_stake,
            CommitmentSlots {
                slot: aggregation_data.bank.slot(),
                root: aggregation_data.root,
                highest_confirmed_slot: aggregation_data.root,
                highest_confirmed_root,
            },
        );
        let highest_confirmed_slot = new_block_commitment.calculate_highest_confirmed_slot();
        new_block_commitment.set_highest_confirmed_slot(highest_confirmed_slot);

        let mut w_block_commitment_cache = block_commitment_cache.write().unwrap();

        let highest_confirmed_root = max(
            new_block_commitment.highest_confirmed_root(),
            w_block_commitment_cache.highest_confirmed_root(),
        );
        new_block_commitment.set_highest_confirmed_root(highest_confirmed_root);

        std::mem::swap(&mut *w_block_commitment_cache, &mut new_block_commitment);
        w_block_commitment_cache.commitment_slots()
    }

    pub fn aggregate_commitment(
        ancestors: &[Slot],
        bank: &Bank,
    ) -> (HashMap<Slot, BlockCommitment>, Vec<(Slot, u64)>) {
        assert!(!ancestors.is_empty());

        // Check ancestors is sorted
        for a in ancestors.windows(2) {
            assert!(a[0] < a[1]);
        }

        let mut commitment = HashMap::new();
        let mut rooted_stake: Vec<(Slot, u64)> = Vec::new();
        for (_, (lamports, account)) in bank.vote_accounts().into_iter() {
            if lamports == 0 {
                continue;
            }
            let vote_state = VoteState::from(&account);
            if vote_state.is_none() {
                continue;
            }

            let vote_state = vote_state.unwrap();
            Self::aggregate_commitment_for_vote_account(
                &mut commitment,
                &mut rooted_stake,
                &vote_state,
                ancestors,
                lamports,
            );
        }

        (commitment, rooted_stake)
    }

    fn aggregate_commitment_for_vote_account(
        commitment: &mut HashMap<Slot, BlockCommitment>,
        rooted_stake: &mut Vec<(Slot, u64)>,
        vote_state: &VoteState,
        ancestors: &[Slot],
        lamports: u64,
    ) {
        assert!(!ancestors.is_empty());
        let mut ancestors_index = 0;
        if let Some(root) = vote_state.root_slot {
            for (i, a) in ancestors.iter().enumerate() {
                if *a <= root {
                    commitment
                        .entry(*a)
                        .or_insert_with(BlockCommitment::default)
                        .increase_rooted_stake(lamports);
                } else {
                    ancestors_index = i;
                    break;
                }
            }
            rooted_stake.push((root, lamports));
        }

        for vote in &vote_state.votes {
            while ancestors[ancestors_index] <= vote.slot {
                commitment
                    .entry(ancestors[ancestors_index])
                    .or_insert_with(BlockCommitment::default)
                    .increase_confirmation_stake(vote.confirmation_count as usize, lamports);
                ancestors_index += 1;

                if ancestors_index == ancestors.len() {
                    return;
                }
            }
        }
    }

    pub fn join(self) -> thread::Result<()> {
        self.t_commitment.join()
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use solana_ledger::genesis_utils::{create_genesis_config, GenesisConfigInfo};
    use solana_runtime::{
        bank_forks::BankForks,
        genesis_utils::{create_genesis_config_with_vote_accounts, ValidatorVoteKeypairs},
    };
    use solana_sdk::{pubkey::Pubkey, signature::Signer};
    use solana_stake_program::stake_state;
    use solana_vote_program::{
        vote_state::{self, VoteStateVersions},
        vote_transaction,
    };

    #[test]
    fn test_get_highest_confirmed_root() {
        assert_eq!(get_highest_confirmed_root(vec![], 10), 0);
        let mut rooted_stake = vec![];
        rooted_stake.push((0, 5));
        rooted_stake.push((1, 5));
        assert_eq!(get_highest_confirmed_root(rooted_stake, 10), 0);
        let mut rooted_stake = vec![];
        rooted_stake.push((1, 5));
        rooted_stake.push((0, 10));
        rooted_stake.push((2, 5));
        rooted_stake.push((1, 4));
        assert_eq!(get_highest_confirmed_root(rooted_stake, 10), 1);
    }

    #[test]
    fn test_aggregate_commitment_for_vote_account_1() {
        let ancestors = vec![3, 4, 5, 7, 9, 11];
        let mut commitment = HashMap::new();
        let mut rooted_stake = vec![];
        let lamports = 5;
        let mut vote_state = VoteState::default();

        let root = *ancestors.last().unwrap();
        vote_state.root_slot = Some(root);
        AggregateCommitmentService::aggregate_commitment_for_vote_account(
            &mut commitment,
            &mut rooted_stake,
            &vote_state,
            &ancestors,
            lamports,
        );

        for a in ancestors {
            let mut expected = BlockCommitment::default();
            expected.increase_rooted_stake(lamports);
            assert_eq!(*commitment.get(&a).unwrap(), expected);
        }
        assert_eq!(rooted_stake[0], (root, lamports));
    }

    #[test]
    fn test_aggregate_commitment_for_vote_account_2() {
        let ancestors = vec![3, 4, 5, 7, 9, 11];
        let mut commitment = HashMap::new();
        let mut rooted_stake = vec![];
        let lamports = 5;
        let mut vote_state = VoteState::default();

        let root = ancestors[2];
        vote_state.root_slot = Some(root);
        vote_state.process_slot_vote_unchecked(*ancestors.last().unwrap());
        AggregateCommitmentService::aggregate_commitment_for_vote_account(
            &mut commitment,
            &mut rooted_stake,
            &vote_state,
            &ancestors,
            lamports,
        );

        for a in ancestors {
            if a <= root {
                let mut expected = BlockCommitment::default();
                expected.increase_rooted_stake(lamports);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(1, lamports);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            }
        }
        assert_eq!(rooted_stake[0], (root, lamports));
    }

    #[test]
    fn test_aggregate_commitment_for_vote_account_3() {
        let ancestors = vec![3, 4, 5, 7, 9, 10, 11];
        let mut commitment = HashMap::new();
        let mut rooted_stake = vec![];
        let lamports = 5;
        let mut vote_state = VoteState::default();

        let root = ancestors[2];
        vote_state.root_slot = Some(root);
        assert!(ancestors[4] + 2 >= ancestors[6]);
        vote_state.process_slot_vote_unchecked(ancestors[4]);
        vote_state.process_slot_vote_unchecked(ancestors[6]);
        AggregateCommitmentService::aggregate_commitment_for_vote_account(
            &mut commitment,
            &mut rooted_stake,
            &vote_state,
            &ancestors,
            lamports,
        );

        for (i, a) in ancestors.iter().enumerate() {
            if *a <= root {
                let mut expected = BlockCommitment::default();
                expected.increase_rooted_stake(lamports);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else if i <= 4 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(2, lamports);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else if i <= 6 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(1, lamports);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            }
        }
        assert_eq!(rooted_stake[0], (root, lamports));
    }

    #[test]
    fn test_aggregate_commitment_validity() {
        let ancestors = vec![3, 4, 5, 7, 9, 10, 11];
        let GenesisConfigInfo {
            mut genesis_config, ..
        } = create_genesis_config(10_000);

        let rooted_stake_amount = 40;

        let sk1 = solana_sdk::pubkey::new_rand();
        let pk1 = solana_sdk::pubkey::new_rand();
        let mut vote_account1 =
            vote_state::create_account(&pk1, &solana_sdk::pubkey::new_rand(), 0, 100);
        let stake_account1 =
            stake_state::create_account(&sk1, &pk1, &vote_account1, &genesis_config.rent, 100);
        let sk2 = solana_sdk::pubkey::new_rand();
        let pk2 = solana_sdk::pubkey::new_rand();
        let mut vote_account2 =
            vote_state::create_account(&pk2, &solana_sdk::pubkey::new_rand(), 0, 50);
        let stake_account2 =
            stake_state::create_account(&sk2, &pk2, &vote_account2, &genesis_config.rent, 50);
        let sk3 = solana_sdk::pubkey::new_rand();
        let pk3 = solana_sdk::pubkey::new_rand();
        let mut vote_account3 =
            vote_state::create_account(&pk3, &solana_sdk::pubkey::new_rand(), 0, 1);
        let stake_account3 = stake_state::create_account(
            &sk3,
            &pk3,
            &vote_account3,
            &genesis_config.rent,
            rooted_stake_amount,
        );
        let sk4 = solana_sdk::pubkey::new_rand();
        let pk4 = solana_sdk::pubkey::new_rand();
        let mut vote_account4 =
            vote_state::create_account(&pk4, &solana_sdk::pubkey::new_rand(), 0, 1);
        let stake_account4 = stake_state::create_account(
            &sk4,
            &pk4,
            &vote_account4,
            &genesis_config.rent,
            rooted_stake_amount,
        );

        genesis_config.accounts.extend(vec![
            (pk1, vote_account1.clone()),
            (sk1, stake_account1),
            (pk2, vote_account2.clone()),
            (sk2, stake_account2),
            (pk3, vote_account3.clone()),
            (sk3, stake_account3),
            (pk4, vote_account4.clone()),
            (sk4, stake_account4),
        ]);

        // Create bank
        let bank = Arc::new(Bank::new(&genesis_config));

        let mut vote_state1 = VoteState::from(&vote_account1).unwrap();
        vote_state1.process_slot_vote_unchecked(3);
        vote_state1.process_slot_vote_unchecked(5);
        let versioned = VoteStateVersions::Current(Box::new(vote_state1));
        VoteState::to(&versioned, &mut vote_account1).unwrap();
        bank.store_account(&pk1, &vote_account1);

        let mut vote_state2 = VoteState::from(&vote_account2).unwrap();
        vote_state2.process_slot_vote_unchecked(9);
        vote_state2.process_slot_vote_unchecked(10);
        let versioned = VoteStateVersions::Current(Box::new(vote_state2));
        VoteState::to(&versioned, &mut vote_account2).unwrap();
        bank.store_account(&pk2, &vote_account2);

        let mut vote_state3 = VoteState::from(&vote_account3).unwrap();
        vote_state3.root_slot = Some(1);
        let versioned = VoteStateVersions::Current(Box::new(vote_state3));
        VoteState::to(&versioned, &mut vote_account3).unwrap();
        bank.store_account(&pk3, &vote_account3);

        let mut vote_state4 = VoteState::from(&vote_account4).unwrap();
        vote_state4.root_slot = Some(2);
        let versioned = VoteStateVersions::Current(Box::new(vote_state4));
        VoteState::to(&versioned, &mut vote_account4).unwrap();
        bank.store_account(&pk4, &vote_account4);

        let (commitment, rooted_stake) =
            AggregateCommitmentService::aggregate_commitment(&ancestors, &bank);

        for a in ancestors {
            if a <= 3 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(2, 150);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else if a <= 5 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(1, 100);
                expected.increase_confirmation_stake(2, 50);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else if a <= 9 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(2, 50);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else if a <= 10 {
                let mut expected = BlockCommitment::default();
                expected.increase_confirmation_stake(1, 50);
                assert_eq!(*commitment.get(&a).unwrap(), expected);
            } else {
                assert!(commitment.get(&a).is_none());
            }
        }
        assert_eq!(rooted_stake.len(), 2);
        assert_eq!(get_highest_confirmed_root(rooted_stake, 100), 1)
    }

    #[test]
    fn test_highest_confirmed_root_advance() {
        fn get_vote_account_root_slot(vote_pubkey: Pubkey, bank: &Arc<Bank>) -> Slot {
            let account = &bank.vote_accounts()[&vote_pubkey].1;
            let vote_state = VoteState::from(account).unwrap();
            vote_state.root_slot.unwrap()
        }

        let block_commitment_cache = RwLock::new(BlockCommitmentCache::new_for_tests());

        let validator_vote_keypairs = ValidatorVoteKeypairs::new_rand();
        let validator_keypairs = vec![&validator_vote_keypairs];
        let GenesisConfigInfo {
            genesis_config,
            mint_keypair: _,
            voting_keypair: _,
        } = create_genesis_config_with_vote_accounts(
            1_000_000_000,
            &validator_keypairs,
            vec![100; 1],
        );

        let bank0 = Bank::new(&genesis_config);
        let mut bank_forks = BankForks::new(bank0);

        // Fill bank_forks with banks with votes landing in the next slot
        // Create enough banks such that vote account will root slots 0 and 1
        for x in 0..33 {
            let previous_bank = bank_forks.get(x).unwrap();
            let bank = Bank::new_from_parent(previous_bank, &Pubkey::default(), x + 1);
            let vote = vote_transaction::new_vote_transaction(
                vec![x],
                previous_bank.hash(),
                previous_bank.last_blockhash(),
                &validator_vote_keypairs.node_keypair,
                &validator_vote_keypairs.vote_keypair,
                &validator_vote_keypairs.vote_keypair,
                None,
            );
            bank.process_transaction(&vote).unwrap();
            bank_forks.insert(bank);
        }

        let working_bank = bank_forks.working_bank();
        let root = get_vote_account_root_slot(
            validator_vote_keypairs.vote_keypair.pubkey(),
            &working_bank,
        );
        for x in 0..root {
            bank_forks.set_root(x, &None, None);
        }

        // Add an additional bank/vote that will root slot 2
        let bank33 = bank_forks.get(33).unwrap();
        let bank34 = Bank::new_from_parent(bank33, &Pubkey::default(), 34);
        let vote33 = vote_transaction::new_vote_transaction(
            vec![33],
            bank33.hash(),
            bank33.last_blockhash(),
            &validator_vote_keypairs.node_keypair,
            &validator_vote_keypairs.vote_keypair,
            &validator_vote_keypairs.vote_keypair,
            None,
        );
        bank34.process_transaction(&vote33).unwrap();
        bank_forks.insert(bank34);

        let working_bank = bank_forks.working_bank();
        let root = get_vote_account_root_slot(
            validator_vote_keypairs.vote_keypair.pubkey(),
            &working_bank,
        );
        let ancestors = working_bank.status_cache_ancestors();
        let _ = AggregateCommitmentService::update_commitment_cache(
            &block_commitment_cache,
            CommitmentAggregationData {
                bank: working_bank,
                root: 0,
                total_stake: 100,
            },
            ancestors,
        );
        let highest_confirmed_root = block_commitment_cache
            .read()
            .unwrap()
            .highest_confirmed_root();
        bank_forks.set_root(root, &None, Some(highest_confirmed_root));
        let highest_confirmed_root_bank = bank_forks.get(highest_confirmed_root);
        assert!(highest_confirmed_root_bank.is_some());

        // Add a forked bank. Because the vote for bank 33 landed in the non-ancestor, the vote
        // account's root (and thus the highest_confirmed_root) rolls back to slot 1
        let bank33 = bank_forks.get(33).unwrap();
        let bank35 = Bank::new_from_parent(bank33, &Pubkey::default(), 35);
        bank_forks.insert(bank35);

        let working_bank = bank_forks.working_bank();
        let ancestors = working_bank.status_cache_ancestors();
        let _ = AggregateCommitmentService::update_commitment_cache(
            &block_commitment_cache,
            CommitmentAggregationData {
                bank: working_bank,
                root: 1,
                total_stake: 100,
            },
            ancestors,
        );
        let highest_confirmed_root = block_commitment_cache
            .read()
            .unwrap()
            .highest_confirmed_root();
        let highest_confirmed_root_bank = bank_forks.get(highest_confirmed_root);
        assert!(highest_confirmed_root_bank.is_some());

        // Add additional banks beyond lockout built on the new fork to ensure that behavior
        // continues normally
        for x in 35..=37 {
            let previous_bank = bank_forks.get(x).unwrap();
            let bank = Bank::new_from_parent(previous_bank, &Pubkey::default(), x + 1);
            let vote = vote_transaction::new_vote_transaction(
                vec![x],
                previous_bank.hash(),
                previous_bank.last_blockhash(),
                &validator_vote_keypairs.node_keypair,
                &validator_vote_keypairs.vote_keypair,
                &validator_vote_keypairs.vote_keypair,
                None,
            );
            bank.process_transaction(&vote).unwrap();
            bank_forks.insert(bank);
        }

        let working_bank = bank_forks.working_bank();
        let root = get_vote_account_root_slot(
            validator_vote_keypairs.vote_keypair.pubkey(),
            &working_bank,
        );
        let ancestors = working_bank.status_cache_ancestors();
        let _ = AggregateCommitmentService::update_commitment_cache(
            &block_commitment_cache,
            CommitmentAggregationData {
                bank: working_bank,
                root: 0,
                total_stake: 100,
            },
            ancestors,
        );
        let highest_confirmed_root = block_commitment_cache
            .read()
            .unwrap()
            .highest_confirmed_root();
        bank_forks.set_root(root, &None, Some(highest_confirmed_root));
        let highest_confirmed_root_bank = bank_forks.get(highest_confirmed_root);
        assert!(highest_confirmed_root_bank.is_some());
    }
}