Skip to main content

hotmint_consensus/
network.rs

1use std::collections::HashMap;
2
3use hotmint_types::epoch::EpochNumber;
4use hotmint_types::evidence::EquivocationProof;
5use hotmint_types::{ConsensusMessage, ValidatorId, ValidatorSet};
6
7/// Message type for the consensus channel: `(sender_id, message)`.
8pub type MsgSender = tokio::sync::mpsc::Sender<(Option<ValidatorId>, ConsensusMessage)>;
9pub type MsgReceiver = tokio::sync::mpsc::Receiver<(Option<ValidatorId>, ConsensusMessage)>;
10
11pub trait NetworkSink: Send + Sync {
12    fn broadcast(&self, msg: ConsensusMessage);
13    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage);
14    /// Notify the network layer of a validator set change (epoch transition).
15    /// Default is no-op for test stubs.
16    fn on_epoch_change(&self, _epoch: EpochNumber, _new_validator_set: &ValidatorSet) {}
17
18    /// Broadcast equivocation evidence to all peers.
19    /// Default is no-op for test stubs.
20    fn broadcast_evidence(&self, _proof: &EquivocationProof) {}
21}
22
23/// Channel-based network stub: routes messages via mpsc senders
24pub struct ChannelNetwork {
25    pub self_id: ValidatorId,
26    pub senders: Vec<(ValidatorId, MsgSender)>,
27}
28
29impl ChannelNetwork {
30    pub fn new(self_id: ValidatorId, senders: Vec<(ValidatorId, MsgSender)>) -> Self {
31        Self { self_id, senders }
32    }
33
34    /// Create a fully-connected mesh of `n` channel networks.
35    ///
36    /// Returns one `(ChannelNetwork, Receiver)` pair per validator
37    /// (indexed by `ValidatorId(0)` .. `ValidatorId(n-1)`),
38    /// eliminating the manual HashMap plumbing.
39    pub fn create_mesh(n: u64) -> Vec<(ChannelNetwork, MsgReceiver)> {
40        let mut senders: HashMap<ValidatorId, MsgSender> = HashMap::new();
41        let mut receivers: HashMap<ValidatorId, MsgReceiver> = HashMap::new();
42
43        for i in 0..n {
44            let (tx, rx) = tokio::sync::mpsc::channel(8192);
45            senders.insert(ValidatorId(i), tx);
46            receivers.insert(ValidatorId(i), rx);
47        }
48
49        let all_senders: Vec<(ValidatorId, MsgSender)> =
50            senders.iter().map(|(&id, tx)| (id, tx.clone())).collect();
51
52        (0..n)
53            .map(|i| {
54                let vid = ValidatorId(i);
55                let rx = receivers.remove(&vid).unwrap();
56                let network = ChannelNetwork::new(vid, all_senders.clone());
57                (network, rx)
58            })
59            .collect()
60    }
61}
62
63impl NetworkSink for ChannelNetwork {
64    fn broadcast(&self, msg: ConsensusMessage) {
65        for (id, sender) in &self.senders {
66            if *id != self.self_id {
67                let _ = sender.try_send((Some(self.self_id), msg.clone()));
68            }
69        }
70    }
71
72    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
73        for (id, sender) in &self.senders {
74            if *id == target {
75                let _ = sender.try_send((Some(self.self_id), msg));
76                return;
77            }
78        }
79    }
80}