Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::{ConsensusMessage, ValidatorId};
8use litep2p::config::ConfigBuilder;
9use litep2p::protocol::notification::{
10    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
11};
12use litep2p::protocol::request_response::{
13    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
14};
15use litep2p::transport::tcp::config::Config as TcpConfig;
16use litep2p::types::multiaddr::Multiaddr;
17use litep2p::{Litep2p, Litep2pEvent, PeerId};
18use tokio::sync::mpsc;
19use tracing::{debug, info, warn};
20
21const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
22const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
23const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
24
25/// Maps ValidatorId <-> PeerId for routing
26#[derive(Clone)]
27pub struct PeerMap {
28    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
29    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
30}
31
32impl PeerMap {
33    pub fn new() -> Self {
34        Self {
35            validator_to_peer: HashMap::new(),
36            peer_to_validator: HashMap::new(),
37        }
38    }
39
40    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
41        self.validator_to_peer.insert(vid, pid);
42        self.peer_to_validator.insert(pid, vid);
43    }
44}
45
46impl Default for PeerMap {
47    fn default() -> Self {
48        Self::new()
49    }
50}
51
52/// Commands sent from the NetworkSink to the NetworkService
53enum NetCommand {
54    Broadcast(Vec<u8>),
55    SendTo(ValidatorId, Vec<u8>),
56}
57
58/// NetworkService wraps litep2p and provides consensus-level networking
59pub struct NetworkService {
60    litep2p: Litep2p,
61    notif_handle: NotificationHandle,
62    reqresp_handle: RequestResponseHandle,
63    peer_map: PeerMap,
64    msg_tx: mpsc::UnboundedSender<(ValidatorId, ConsensusMessage)>,
65    cmd_rx: mpsc::UnboundedReceiver<NetCommand>,
66}
67
68impl NetworkService {
69    /// Create the network service and a NetworkSink for the consensus engine.
70    /// Returns (service, network_sink, msg_rx_for_engine)
71    #[allow(clippy::type_complexity)]
72    pub fn create(
73        listen_addr: Multiaddr,
74        peer_map: PeerMap,
75        known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
76    ) -> Result<(
77        Self,
78        Litep2pNetworkSink,
79        mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
80    )> {
81        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
82            .with_max_size(MAX_NOTIFICATION_SIZE)
83            .with_auto_accept_inbound(true)
84            .with_sync_channel_size(1024)
85            .with_async_channel_size(1024)
86            .build();
87
88        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
89            .with_max_size(MAX_NOTIFICATION_SIZE)
90            .build();
91
92        let mut config_builder = ConfigBuilder::new()
93            .with_tcp(TcpConfig {
94                listen_addresses: vec![listen_addr],
95                ..Default::default()
96            })
97            .with_notification_protocol(notif_config)
98            .with_request_response_protocol(reqresp_config);
99
100        if !known_addresses.is_empty() {
101            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
102        }
103
104        let litep2p =
105            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
106
107        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
108        for addr in litep2p.listen_addresses() {
109            info!(address = %addr, "listening on");
110        }
111
112        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
113        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
114
115        let sink = Litep2pNetworkSink { cmd_tx };
116
117        Ok((
118            Self {
119                litep2p,
120                notif_handle,
121                reqresp_handle,
122                peer_map,
123                msg_tx,
124                cmd_rx,
125            },
126            sink,
127            msg_rx,
128        ))
129    }
130
131    pub fn local_peer_id(&self) -> &PeerId {
132        self.litep2p.local_peer_id()
133    }
134
135    /// Run the network event loop
136    pub async fn run(mut self) {
137        loop {
138            tokio::select! {
139                event = self.notif_handle.next() => {
140                    if let Some(event) = event {
141                        self.handle_notification_event(event);
142                    }
143                }
144                event = self.reqresp_handle.next() => {
145                    if let Some(event) = event {
146                        self.handle_reqresp_event(event);
147                    }
148                }
149                event = self.litep2p.next_event() => {
150                    if let Some(event) = event {
151                        self.handle_litep2p_event(event);
152                    }
153                }
154                Some(cmd) = self.cmd_rx.recv() => {
155                    self.handle_command(cmd).await;
156                }
157            }
158        }
159    }
160
161    fn handle_notification_event(&mut self, event: NotificationEvent) {
162        match event {
163            NotificationEvent::ValidateSubstream { peer, .. } => {
164                self.notif_handle
165                    .send_validation_result(peer, ValidationResult::Accept);
166            }
167            NotificationEvent::NotificationStreamOpened { peer, .. } => {
168                info!(peer = %peer, "notification stream opened");
169            }
170            NotificationEvent::NotificationStreamClosed { peer } => {
171                debug!(peer = %peer, "notification stream closed");
172            }
173            NotificationEvent::NotificationReceived { peer, notification } => {
174                let sender = self
175                    .peer_map
176                    .peer_to_validator
177                    .get(&peer)
178                    .copied()
179                    .unwrap_or_default();
180                match rmp_serde::from_slice::<ConsensusMessage>(&notification) {
181                    Ok(msg) => {
182                        let _ = self.msg_tx.send((sender, msg));
183                    }
184                    Err(e) => {
185                        warn!(error = %e, "failed to decode notification");
186                    }
187                }
188            }
189            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
190                warn!(peer = %peer, error = ?error, "notification stream open failed");
191            }
192        }
193    }
194
195    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
196        match event {
197            RequestResponseEvent::RequestReceived {
198                peer,
199                request_id,
200                request,
201                ..
202            } => {
203                let sender = self
204                    .peer_map
205                    .peer_to_validator
206                    .get(&peer)
207                    .copied()
208                    .unwrap_or_default();
209                match rmp_serde::from_slice::<ConsensusMessage>(&request) {
210                    Ok(msg) => {
211                        let _ = self.msg_tx.send((sender, msg));
212                        self.reqresp_handle.send_response(request_id, vec![]);
213                    }
214                    Err(e) => {
215                        warn!(error = %e, "failed to decode request");
216                        self.reqresp_handle.reject_request(request_id);
217                    }
218                }
219            }
220            RequestResponseEvent::ResponseReceived { .. } => {}
221            RequestResponseEvent::RequestFailed { peer, error, .. } => {
222                debug!(peer = %peer, error = ?error, "request failed");
223            }
224        }
225    }
226
227    fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
228        match event {
229            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
230                info!(peer = %peer, endpoint = ?endpoint, "connection established");
231            }
232            Litep2pEvent::ConnectionClosed { peer, .. } => {
233                debug!(peer = %peer, "connection closed");
234            }
235            Litep2pEvent::DialFailure { address, error, .. } => {
236                warn!(address = %address, error = ?error, "dial failed");
237            }
238            _ => {}
239        }
240    }
241
242    async fn handle_command(&mut self, cmd: NetCommand) {
243        match cmd {
244            NetCommand::Broadcast(bytes) => {
245                // Send to all connected peers via notification
246                for &peer in self.peer_map.peer_to_validator.keys() {
247                    let _ = self
248                        .notif_handle
249                        .send_sync_notification(peer, bytes.clone());
250                }
251            }
252            NetCommand::SendTo(target, bytes) => {
253                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
254                    let _ = self
255                        .reqresp_handle
256                        .send_request(peer_id, bytes, DialOptions::Reject)
257                        .await;
258                }
259            }
260        }
261    }
262}
263
264/// NetworkSink backed by litep2p, for use by the consensus engine.
265/// Sends commands to the NetworkService via a channel.
266pub struct Litep2pNetworkSink {
267    cmd_tx: mpsc::UnboundedSender<NetCommand>,
268}
269
270impl NetworkSink for Litep2pNetworkSink {
271    fn broadcast(&self, msg: ConsensusMessage) {
272        if let Ok(bytes) = rmp_serde::to_vec(&msg) {
273            let _ = self.cmd_tx.send(NetCommand::Broadcast(bytes));
274        }
275    }
276
277    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
278        if let Ok(bytes) = rmp_serde::to_vec(&msg) {
279            let _ = self.cmd_tx.send(NetCommand::SendTo(target, bytes));
280        }
281    }
282}