Skip to main content

hotmint_network/
service.rs

1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::sync::{SyncRequest, SyncResponse};
8use hotmint_types::{ConsensusMessage, ValidatorId};
9use litep2p::config::ConfigBuilder;
10use litep2p::protocol::notification::{
11    ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
12};
13use litep2p::protocol::request_response::{
14    ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
15};
16use litep2p::transport::tcp::config::Config as TcpConfig;
17use litep2p::types::RequestId;
18use litep2p::types::multiaddr::Multiaddr;
19use litep2p::{Litep2p, Litep2pEvent, PeerId};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, watch};
22use tracing::{debug, info, warn};
23
24const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
25const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
26const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
27const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
28
29/// Maps ValidatorId <-> PeerId for routing
30#[derive(Clone)]
31pub struct PeerMap {
32    pub validator_to_peer: HashMap<ValidatorId, PeerId>,
33    pub peer_to_validator: HashMap<PeerId, ValidatorId>,
34}
35
36impl PeerMap {
37    pub fn new() -> Self {
38        Self {
39            validator_to_peer: HashMap::new(),
40            peer_to_validator: HashMap::new(),
41        }
42    }
43
44    pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
45        self.validator_to_peer.insert(vid, pid);
46        self.peer_to_validator.insert(pid, vid);
47    }
48
49    pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
50        if let Some(pid) = self.validator_to_peer.remove(&vid) {
51            self.peer_to_validator.remove(&pid);
52            Some(pid)
53        } else {
54            None
55        }
56    }
57}
58
59impl Default for PeerMap {
60    fn default() -> Self {
61        Self::new()
62    }
63}
64
65/// Status of a peer for external queries
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct PeerStatus {
68    pub validator_id: ValidatorId,
69    pub peer_id: String,
70}
71
72/// Commands sent from the NetworkSink to the NetworkService
73pub enum NetCommand {
74    Broadcast(Vec<u8>),
75    SendTo(ValidatorId, Vec<u8>),
76    AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
77    RemovePeer(ValidatorId),
78    /// Send a sync request to a specific peer
79    SyncRequest(PeerId, Vec<u8>),
80    /// Respond to a sync request
81    SyncRespond(RequestId, Vec<u8>),
82}
83
84/// Incoming sync request forwarded to the sync responder
85pub struct IncomingSyncRequest {
86    pub request_id: RequestId,
87    pub peer: PeerId,
88    pub request: SyncRequest,
89}
90
91/// NetworkService wraps litep2p and provides consensus-level networking
92pub struct NetworkService {
93    litep2p: Litep2p,
94    notif_handle: NotificationHandle,
95    reqresp_handle: RequestResponseHandle,
96    sync_handle: RequestResponseHandle,
97    peer_map: PeerMap,
98    msg_tx: mpsc::UnboundedSender<(ValidatorId, ConsensusMessage)>,
99    cmd_rx: mpsc::UnboundedReceiver<NetCommand>,
100    sync_req_tx: mpsc::UnboundedSender<IncomingSyncRequest>,
101    sync_resp_tx: mpsc::UnboundedSender<SyncResponse>,
102    peer_info_tx: watch::Sender<Vec<PeerStatus>>,
103}
104
105impl NetworkService {
106    /// Create the network service and a NetworkSink for the consensus engine.
107    ///
108    /// Returns:
109    /// - `NetworkService` — run with `.run()`
110    /// - `Litep2pNetworkSink` — for consensus engine + RPC peer management
111    /// - `msg_rx` — consensus messages for the engine
112    /// - `sync_req_rx` — incoming sync requests for the sync responder
113    /// - `sync_resp_rx` — sync responses for the sync requester
114    /// - `peer_info_rx` — peer list updates for RPC
115    #[allow(clippy::type_complexity)]
116    pub fn create(
117        listen_addr: Multiaddr,
118        peer_map: PeerMap,
119        known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
120    ) -> Result<(
121        Self,
122        Litep2pNetworkSink,
123        mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
124        mpsc::UnboundedReceiver<IncomingSyncRequest>,
125        mpsc::UnboundedReceiver<SyncResponse>,
126        watch::Receiver<Vec<PeerStatus>>,
127    )> {
128        let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
129            .with_max_size(MAX_NOTIFICATION_SIZE)
130            .with_auto_accept_inbound(true)
131            .with_sync_channel_size(1024)
132            .with_async_channel_size(1024)
133            .build();
134
135        let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
136            .with_max_size(MAX_NOTIFICATION_SIZE)
137            .build();
138
139        let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
140            .with_max_size(MAX_NOTIFICATION_SIZE)
141            .build();
142
143        let mut config_builder = ConfigBuilder::new()
144            .with_tcp(TcpConfig {
145                listen_addresses: vec![listen_addr],
146                ..Default::default()
147            })
148            .with_notification_protocol(notif_config)
149            .with_request_response_protocol(reqresp_config)
150            .with_request_response_protocol(sync_config);
151
152        if !known_addresses.is_empty() {
153            config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
154        }
155
156        let litep2p =
157            Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
158
159        info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
160        for addr in litep2p.listen_addresses() {
161            info!(address = %addr, "listening on");
162        }
163
164        let (msg_tx, msg_rx) = mpsc::unbounded_channel();
165        let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
166        let (sync_req_tx, sync_req_rx) = mpsc::unbounded_channel();
167        let (sync_resp_tx, sync_resp_rx) = mpsc::unbounded_channel();
168
169        // Build initial peer info
170        let initial_peers: Vec<PeerStatus> = peer_map
171            .validator_to_peer
172            .iter()
173            .map(|(&vid, pid)| PeerStatus {
174                validator_id: vid,
175                peer_id: pid.to_string(),
176            })
177            .collect();
178        let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
179
180        let sink = Litep2pNetworkSink {
181            cmd_tx: cmd_tx.clone(),
182        };
183
184        Ok((
185            Self {
186                litep2p,
187                notif_handle,
188                reqresp_handle,
189                sync_handle,
190                peer_map,
191                msg_tx,
192                cmd_rx,
193                sync_req_tx,
194                sync_resp_tx,
195                peer_info_tx,
196            },
197            sink,
198            msg_rx,
199            sync_req_rx,
200            sync_resp_rx,
201            peer_info_rx,
202        ))
203    }
204
205    pub fn local_peer_id(&self) -> &PeerId {
206        self.litep2p.local_peer_id()
207    }
208
209    /// Run the network event loop
210    pub async fn run(mut self) {
211        loop {
212            tokio::select! {
213                event = self.notif_handle.next() => {
214                    if let Some(event) = event {
215                        self.handle_notification_event(event);
216                    }
217                }
218                event = self.reqresp_handle.next() => {
219                    if let Some(event) = event {
220                        self.handle_reqresp_event(event);
221                    }
222                }
223                event = self.sync_handle.next() => {
224                    if let Some(event) = event {
225                        self.handle_sync_event(event);
226                    }
227                }
228                event = self.litep2p.next_event() => {
229                    if let Some(event) = event {
230                        self.handle_litep2p_event(event);
231                    }
232                }
233                Some(cmd) = self.cmd_rx.recv() => {
234                    self.handle_command(cmd).await;
235                }
236            }
237        }
238    }
239
240    fn handle_notification_event(&mut self, event: NotificationEvent) {
241        match event {
242            NotificationEvent::ValidateSubstream { peer, .. } => {
243                self.notif_handle
244                    .send_validation_result(peer, ValidationResult::Accept);
245            }
246            NotificationEvent::NotificationStreamOpened { peer, .. } => {
247                info!(peer = %peer, "notification stream opened");
248            }
249            NotificationEvent::NotificationStreamClosed { peer } => {
250                debug!(peer = %peer, "notification stream closed");
251            }
252            NotificationEvent::NotificationReceived { peer, notification } => {
253                let sender = self
254                    .peer_map
255                    .peer_to_validator
256                    .get(&peer)
257                    .copied()
258                    .unwrap_or_default();
259                match serde_cbor_2::from_slice::<ConsensusMessage>(&notification) {
260                    Ok(msg) => {
261                        let _ = self.msg_tx.send((sender, msg));
262                    }
263                    Err(e) => {
264                        warn!(error = %e, "failed to decode notification");
265                    }
266                }
267            }
268            NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
269                warn!(peer = %peer, error = ?error, "notification stream open failed");
270            }
271        }
272    }
273
274    fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
275        match event {
276            RequestResponseEvent::RequestReceived {
277                peer,
278                request_id,
279                request,
280                ..
281            } => {
282                let sender = self
283                    .peer_map
284                    .peer_to_validator
285                    .get(&peer)
286                    .copied()
287                    .unwrap_or_default();
288                match serde_cbor_2::from_slice::<ConsensusMessage>(&request) {
289                    Ok(msg) => {
290                        let _ = self.msg_tx.send((sender, msg));
291                        self.reqresp_handle.send_response(request_id, vec![]);
292                    }
293                    Err(e) => {
294                        warn!(error = %e, "failed to decode request");
295                        self.reqresp_handle.reject_request(request_id);
296                    }
297                }
298            }
299            RequestResponseEvent::ResponseReceived { .. } => {}
300            RequestResponseEvent::RequestFailed { peer, error, .. } => {
301                debug!(peer = %peer, error = ?error, "request failed");
302            }
303        }
304    }
305
306    fn handle_sync_event(&mut self, event: RequestResponseEvent) {
307        match event {
308            RequestResponseEvent::RequestReceived {
309                peer,
310                request_id,
311                request,
312                ..
313            } => {
314                // Forward sync request to the sync responder
315                match serde_cbor_2::from_slice::<SyncRequest>(&request) {
316                    Ok(req) => {
317                        let _ = self.sync_req_tx.send(IncomingSyncRequest {
318                            request_id,
319                            peer,
320                            request: req,
321                        });
322                    }
323                    Err(e) => {
324                        warn!(error = %e, "failed to decode sync request");
325                        let err_resp = SyncResponse::Error(format!("decode error: {e}"));
326                        if let Ok(bytes) = serde_cbor_2::to_vec(&err_resp) {
327                            self.sync_handle.send_response(request_id, bytes);
328                        } else {
329                            self.sync_handle.reject_request(request_id);
330                        }
331                    }
332                }
333            }
334            RequestResponseEvent::ResponseReceived {
335                request_id: _,
336                response,
337                ..
338            } => {
339                // Forward sync response to the sync requester
340                match serde_cbor_2::from_slice::<SyncResponse>(&response) {
341                    Ok(resp) => {
342                        let _ = self.sync_resp_tx.send(resp);
343                    }
344                    Err(e) => {
345                        warn!(error = %e, "failed to decode sync response");
346                    }
347                }
348            }
349            RequestResponseEvent::RequestFailed { peer, error, .. } => {
350                debug!(peer = %peer, error = ?error, "sync request failed");
351                let _ = self
352                    .sync_resp_tx
353                    .send(SyncResponse::Error(format!("request failed: {error:?}")));
354            }
355        }
356    }
357
358    fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
359        match event {
360            Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
361                info!(peer = %peer, endpoint = ?endpoint, "connection established");
362            }
363            Litep2pEvent::ConnectionClosed { peer, .. } => {
364                debug!(peer = %peer, "connection closed");
365            }
366            Litep2pEvent::DialFailure { address, error, .. } => {
367                warn!(address = %address, error = ?error, "dial failed");
368            }
369            _ => {}
370        }
371    }
372
373    fn update_peer_info(&self) {
374        let peers: Vec<PeerStatus> = self
375            .peer_map
376            .validator_to_peer
377            .iter()
378            .map(|(&vid, pid)| PeerStatus {
379                validator_id: vid,
380                peer_id: pid.to_string(),
381            })
382            .collect();
383        let _ = self.peer_info_tx.send(peers);
384    }
385
386    async fn handle_command(&mut self, cmd: NetCommand) {
387        match cmd {
388            NetCommand::Broadcast(bytes) => {
389                for &peer in self.peer_map.peer_to_validator.keys() {
390                    let _ = self
391                        .notif_handle
392                        .send_sync_notification(peer, bytes.clone());
393                }
394            }
395            NetCommand::SendTo(target, bytes) => {
396                if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
397                    let _ = self
398                        .reqresp_handle
399                        .send_request(peer_id, bytes, DialOptions::Reject)
400                        .await;
401                }
402            }
403            NetCommand::AddPeer(vid, pid, addrs) => {
404                info!(validator = %vid, peer = %pid, "adding peer");
405                self.peer_map.insert(vid, pid);
406                self.litep2p.add_known_address(pid, addrs.into_iter());
407                self.update_peer_info();
408            }
409            NetCommand::RemovePeer(vid) => {
410                if let Some(pid) = self.peer_map.remove(vid) {
411                    info!(validator = %vid, peer = %pid, "removed peer");
412                } else {
413                    warn!(validator = %vid, "peer not found for removal");
414                }
415                self.update_peer_info();
416            }
417            NetCommand::SyncRequest(peer_id, bytes) => {
418                let _ = self
419                    .sync_handle
420                    .send_request(peer_id, bytes, DialOptions::Reject)
421                    .await;
422            }
423            NetCommand::SyncRespond(request_id, bytes) => {
424                self.sync_handle.send_response(request_id, bytes);
425            }
426        }
427    }
428}
429
430/// NetworkSink backed by litep2p, for use by the consensus engine.
431/// Also provides methods for peer management and sync.
432pub struct Litep2pNetworkSink {
433    cmd_tx: mpsc::UnboundedSender<NetCommand>,
434}
435
436impl Litep2pNetworkSink {
437    pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
438        let _ = self.cmd_tx.send(NetCommand::AddPeer(vid, pid, addrs));
439    }
440
441    pub fn remove_peer(&self, vid: ValidatorId) {
442        let _ = self.cmd_tx.send(NetCommand::RemovePeer(vid));
443    }
444
445    pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
446        if let Ok(bytes) = serde_cbor_2::to_vec(request) {
447            let _ = self.cmd_tx.send(NetCommand::SyncRequest(peer_id, bytes));
448        }
449    }
450
451    pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
452        if let Ok(bytes) = serde_cbor_2::to_vec(response) {
453            let _ = self.cmd_tx.send(NetCommand::SyncRespond(request_id, bytes));
454        }
455    }
456}
457
458impl NetworkSink for Litep2pNetworkSink {
459    fn broadcast(&self, msg: ConsensusMessage) {
460        if let Ok(bytes) = serde_cbor_2::to_vec(&msg) {
461            let _ = self.cmd_tx.send(NetCommand::Broadcast(bytes));
462        }
463    }
464
465    fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
466        if let Ok(bytes) = serde_cbor_2::to_vec(&msg) {
467            let _ = self.cmd_tx.send(NetCommand::SendTo(target, bytes));
468        }
469    }
470}