solana-core 1.4.5

Blockchain, Rebuilt for Scale
use crate::{
    cluster_info_vote_listener::VerifiedLabelVotePacketsReceiver, crds_value::CrdsValueLabel,
    result::Result,
};
use solana_perf::packet::Packets;
use std::{collections::HashMap, ops::Deref, time::Duration};

#[derive(Default)]
pub struct VerifiedVotePackets(HashMap<CrdsValueLabel, (u64, Packets)>);

impl Deref for VerifiedVotePackets {
    type Target = HashMap<CrdsValueLabel, (u64, Packets)>;
    fn deref(&self) -> &Self::Target {
        &self.0
    }
}

impl VerifiedVotePackets {
    pub fn get_and_process_vote_packets(
        &mut self,
        vote_packets_receiver: &VerifiedLabelVotePacketsReceiver,
        last_update_version: &mut u64,
    ) -> Result<()> {
        let timer = Duration::from_millis(200);
        let vote_packets = vote_packets_receiver.recv_timeout(timer)?;
        *last_update_version += 1;
        for (label, packet) in vote_packets {
            self.0.insert(label, (*last_update_version, packet));
        }
        while let Ok(vote_packets) = vote_packets_receiver.try_recv() {
            for (label, packet) in vote_packets {
                self.0.insert(label, (*last_update_version, packet));
            }
        }
        Ok(())
    }

    pub fn get_latest_votes(&self, last_update_version: u64) -> (u64, Vec<Packets>) {
        let mut new_update_version = last_update_version;
        let msgs: Vec<_> = self
            .iter()
            .filter_map(|(_, (msg_update_version, msg))| {
                if *msg_update_version > last_update_version {
                    new_update_version = std::cmp::max(*msg_update_version, new_update_version);
                    Some(msg)
                } else {
                    None
                }
            })
            .cloned()
            .collect();
        (new_update_version, msgs)
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::result::Error;
    use crossbeam_channel::{unbounded, RecvTimeoutError};
    use solana_perf::packet::{Meta, Packet};

    #[test]
    fn test_get_latest_votes() {
        let pubkey = solana_sdk::pubkey::new_rand();
        let label1 = CrdsValueLabel::Vote(0 as u8, pubkey);
        let label2 = CrdsValueLabel::Vote(1 as u8, pubkey);
        let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());

        let data = Packet {
            meta: Meta {
                repair: true,
                ..Meta::default()
            },
            ..Packet::default()
        };

        let none_empty_packets = Packets::new(vec![data, Packet::default()]);

        verified_vote_packets
            .0
            .insert(label1, (2, none_empty_packets));
        verified_vote_packets
            .0
            .insert(label2, (1, Packets::default()));

        // Both updates have timestamps greater than 0, so both should be returned
        let (new_update_version, updates) = verified_vote_packets.get_latest_votes(0);
        assert_eq!(new_update_version, 2);
        assert_eq!(updates.len(), 2);

        // Only the nonempty packet had a timestamp greater than 1
        let (new_update_version, updates) = verified_vote_packets.get_latest_votes(1);
        assert_eq!(new_update_version, 2);
        assert_eq!(updates.len(), 1);
        assert_eq!(updates[0].packets.is_empty(), false);

        // If the given timestamp is greater than all timestamps in any update,
        // returned timestamp should be the same as the given timestamp, and
        // no updates should be returned
        let (new_update_version, updates) = verified_vote_packets.get_latest_votes(3);
        assert_eq!(new_update_version, 3);
        assert!(updates.is_empty());
    }

    #[test]
    fn test_get_and_process_vote_packets() {
        let (s, r) = unbounded();
        let pubkey = solana_sdk::pubkey::new_rand();
        let label1 = CrdsValueLabel::Vote(0 as u8, pubkey);
        let label2 = CrdsValueLabel::Vote(1 as u8, pubkey);
        let mut update_version = 0;
        s.send(vec![(label1.clone(), Packets::default())]).unwrap();
        s.send(vec![(label2.clone(), Packets::default())]).unwrap();

        let data = Packet {
            meta: Meta {
                repair: true,
                ..Meta::default()
            },
            ..Packet::default()
        };

        let later_packets = Packets::new(vec![data, Packet::default()]);
        s.send(vec![(label1.clone(), later_packets)]).unwrap();
        let mut verified_vote_packets = VerifiedVotePackets(HashMap::new());
        verified_vote_packets
            .get_and_process_vote_packets(&r, &mut update_version)
            .unwrap();

        // Test timestamps for same batch are the same
        let update_version1 = verified_vote_packets.get(&label1).unwrap().0;
        assert_eq!(
            update_version1,
            verified_vote_packets.get(&label2).unwrap().0
        );

        // Test the later value overwrote the earlier one for this label
        assert!(verified_vote_packets.get(&label1).unwrap().1.packets.len() > 1);
        assert_eq!(
            verified_vote_packets.get(&label2).unwrap().1.packets.len(),
            0
        );

        // Test timestamp for next batch overwrites the original
        s.send(vec![(label2.clone(), Packets::default())]).unwrap();
        verified_vote_packets
            .get_and_process_vote_packets(&r, &mut update_version)
            .unwrap();
        let update_version2 = verified_vote_packets.get(&label2).unwrap().0;
        assert!(update_version2 > update_version1);

        // Test empty doesn't bump the version
        let before = update_version;
        assert_matches!(
            verified_vote_packets.get_and_process_vote_packets(&r, &mut update_version),
            Err(Error::CrossbeamRecvTimeoutError(RecvTimeoutError::Timeout))
        );
        assert_eq!(before, update_version);
    }
}