1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use {
    crate::{
        cluster_info::{ClusterInfo, GOSSIP_SLEEP_MILLIS},
        crds::Cursor,
        duplicate_shred::DuplicateShred,
    },
    std::{
        sync::{
            atomic::{AtomicBool, Ordering},
            Arc,
        },
        thread::{self, sleep, Builder, JoinHandle},
        time::Duration,
    },
};

pub trait DuplicateShredHandlerTrait: Send {
    fn handle(&mut self, data: DuplicateShred);
}

pub struct DuplicateShredListener {
    thread_hdl: JoinHandle<()>,
}

// Right now we only need to process duplicate proof, in the future the receiver
// should be a map from enum value to handlers.
impl DuplicateShredListener {
    pub fn new(
        exit: Arc<AtomicBool>,
        cluster_info: Arc<ClusterInfo>,
        handler: impl DuplicateShredHandlerTrait + 'static,
    ) -> Self {
        let listen_thread = Builder::new()
            .name("solCiEntryLstnr".to_string())
            .spawn(move || {
                Self::recv_loop(exit, &cluster_info, handler);
            })
            .unwrap();

        Self {
            thread_hdl: listen_thread,
        }
    }

    pub fn join(self) -> thread::Result<()> {
        self.thread_hdl.join()
    }

    // Here we are sending data one by one rather than in a batch because in the future
    // we may send different type of CrdsData to different senders.
    fn recv_loop(
        exit: Arc<AtomicBool>,
        cluster_info: &ClusterInfo,
        mut handler: impl DuplicateShredHandlerTrait + 'static,
    ) {
        let mut cursor = Cursor::default();
        while !exit.load(Ordering::Relaxed) {
            let entries: Vec<DuplicateShred> = cluster_info.get_duplicate_shreds(&mut cursor);
            for x in entries {
                handler.handle(x);
            }
            sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
        }
    }
}

#[cfg(test)]
mod tests {
    use {
        super::*,
        crate::{
            cluster_info::Node, duplicate_shred::tests::new_rand_shred,
            duplicate_shred_listener::DuplicateShredHandlerTrait,
        },
        solana_ledger::shred::Shredder,
        solana_sdk::signature::{Keypair, Signer},
        solana_streamer::socket::SocketAddrSpace,
        std::sync::{
            atomic::{AtomicU32, Ordering},
            Arc,
        },
    };
    struct FakeHandler {
        count: Arc<AtomicU32>,
    }

    impl FakeHandler {
        fn new(count: Arc<AtomicU32>) -> Self {
            Self { count }
        }
    }

    impl DuplicateShredHandlerTrait for FakeHandler {
        fn handle(&mut self, data: DuplicateShred) {
            assert!(data.num_chunks() > 0);
            self.count.fetch_add(1, Ordering::Relaxed);
        }
    }

    #[test]
    fn test_listener_get_entries() {
        let host1_key = Arc::new(Keypair::new());
        let node = Node::new_localhost_with_pubkey(&host1_key.pubkey());
        let cluster_info = Arc::new(ClusterInfo::new(
            node.info,
            host1_key,
            SocketAddrSpace::Unspecified,
        ));
        let exit = Arc::new(AtomicBool::new(false));
        let count = Arc::new(AtomicU32::new(0));
        let handler = FakeHandler::new(count.clone());
        let listener = DuplicateShredListener::new(exit.clone(), cluster_info.clone(), handler);
        let mut rng = rand::thread_rng();
        let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
        let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
        let next_shred_index = 353;
        let leader = Arc::new(Keypair::new());
        let shred1 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
        let shred2 = new_rand_shred(&mut rng, next_shred_index, &shredder, &leader);
        assert!(cluster_info
            .push_duplicate_shred(&shred1, shred2.payload())
            .is_ok());
        cluster_info.flush_push_queue();
        sleep(Duration::from_millis(GOSSIP_SLEEP_MILLIS));
        assert_eq!(count.load(Ordering::Relaxed), 3);
        exit.store(true, Ordering::Relaxed);
        assert!(listener.join().is_ok());
    }
}