1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::sync::{SyncRequest, SyncResponse};
8use hotmint_types::{ConsensusMessage, ValidatorId};
9use litep2p::config::ConfigBuilder;
10use litep2p::protocol::notification::{
11 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
12};
13use litep2p::protocol::request_response::{
14 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
15};
16use litep2p::transport::tcp::config::Config as TcpConfig;
17use litep2p::types::RequestId;
18use litep2p::types::multiaddr::Multiaddr;
19use litep2p::{Litep2p, Litep2pEvent, PeerId};
20use serde::{Deserialize, Serialize};
21use tokio::sync::{mpsc, watch};
22use tracing::{debug, info, warn};
23
24const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
25const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
26const SYNC_PROTOCOL: &str = "/hotmint/sync/1";
27const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
28
29#[derive(Clone)]
31pub struct PeerMap {
32 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
33 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
34}
35
36impl PeerMap {
37 pub fn new() -> Self {
38 Self {
39 validator_to_peer: HashMap::new(),
40 peer_to_validator: HashMap::new(),
41 }
42 }
43
44 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
45 self.validator_to_peer.insert(vid, pid);
46 self.peer_to_validator.insert(pid, vid);
47 }
48
49 pub fn remove(&mut self, vid: ValidatorId) -> Option<PeerId> {
50 if let Some(pid) = self.validator_to_peer.remove(&vid) {
51 self.peer_to_validator.remove(&pid);
52 Some(pid)
53 } else {
54 None
55 }
56 }
57}
58
59impl Default for PeerMap {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct PeerStatus {
68 pub validator_id: ValidatorId,
69 pub peer_id: String,
70}
71
72pub enum NetCommand {
74 Broadcast(Vec<u8>),
75 SendTo(ValidatorId, Vec<u8>),
76 AddPeer(ValidatorId, PeerId, Vec<Multiaddr>),
77 RemovePeer(ValidatorId),
78 SyncRequest(PeerId, Vec<u8>),
80 SyncRespond(RequestId, Vec<u8>),
82}
83
84pub struct IncomingSyncRequest {
86 pub request_id: RequestId,
87 pub peer: PeerId,
88 pub request: SyncRequest,
89}
90
91pub struct NetworkService {
93 litep2p: Litep2p,
94 notif_handle: NotificationHandle,
95 reqresp_handle: RequestResponseHandle,
96 sync_handle: RequestResponseHandle,
97 peer_map: PeerMap,
98 msg_tx: mpsc::UnboundedSender<(ValidatorId, ConsensusMessage)>,
99 cmd_rx: mpsc::UnboundedReceiver<NetCommand>,
100 sync_req_tx: mpsc::UnboundedSender<IncomingSyncRequest>,
101 sync_resp_tx: mpsc::UnboundedSender<SyncResponse>,
102 peer_info_tx: watch::Sender<Vec<PeerStatus>>,
103}
104
105impl NetworkService {
106 #[allow(clippy::type_complexity)]
116 pub fn create(
117 listen_addr: Multiaddr,
118 peer_map: PeerMap,
119 known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
120 ) -> Result<(
121 Self,
122 Litep2pNetworkSink,
123 mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
124 mpsc::UnboundedReceiver<IncomingSyncRequest>,
125 mpsc::UnboundedReceiver<SyncResponse>,
126 watch::Receiver<Vec<PeerStatus>>,
127 )> {
128 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
129 .with_max_size(MAX_NOTIFICATION_SIZE)
130 .with_auto_accept_inbound(true)
131 .with_sync_channel_size(1024)
132 .with_async_channel_size(1024)
133 .build();
134
135 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
136 .with_max_size(MAX_NOTIFICATION_SIZE)
137 .build();
138
139 let (sync_config, sync_handle) = ReqRespConfigBuilder::new(SYNC_PROTOCOL.into())
140 .with_max_size(MAX_NOTIFICATION_SIZE)
141 .build();
142
143 let mut config_builder = ConfigBuilder::new()
144 .with_tcp(TcpConfig {
145 listen_addresses: vec![listen_addr],
146 ..Default::default()
147 })
148 .with_notification_protocol(notif_config)
149 .with_request_response_protocol(reqresp_config)
150 .with_request_response_protocol(sync_config);
151
152 if !known_addresses.is_empty() {
153 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
154 }
155
156 let litep2p =
157 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
158
159 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
160 for addr in litep2p.listen_addresses() {
161 info!(address = %addr, "listening on");
162 }
163
164 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
165 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
166 let (sync_req_tx, sync_req_rx) = mpsc::unbounded_channel();
167 let (sync_resp_tx, sync_resp_rx) = mpsc::unbounded_channel();
168
169 let initial_peers: Vec<PeerStatus> = peer_map
171 .validator_to_peer
172 .iter()
173 .map(|(&vid, pid)| PeerStatus {
174 validator_id: vid,
175 peer_id: pid.to_string(),
176 })
177 .collect();
178 let (peer_info_tx, peer_info_rx) = watch::channel(initial_peers);
179
180 let sink = Litep2pNetworkSink {
181 cmd_tx: cmd_tx.clone(),
182 };
183
184 Ok((
185 Self {
186 litep2p,
187 notif_handle,
188 reqresp_handle,
189 sync_handle,
190 peer_map,
191 msg_tx,
192 cmd_rx,
193 sync_req_tx,
194 sync_resp_tx,
195 peer_info_tx,
196 },
197 sink,
198 msg_rx,
199 sync_req_rx,
200 sync_resp_rx,
201 peer_info_rx,
202 ))
203 }
204
205 pub fn local_peer_id(&self) -> &PeerId {
206 self.litep2p.local_peer_id()
207 }
208
209 pub async fn run(mut self) {
211 loop {
212 tokio::select! {
213 event = self.notif_handle.next() => {
214 if let Some(event) = event {
215 self.handle_notification_event(event);
216 }
217 }
218 event = self.reqresp_handle.next() => {
219 if let Some(event) = event {
220 self.handle_reqresp_event(event);
221 }
222 }
223 event = self.sync_handle.next() => {
224 if let Some(event) = event {
225 self.handle_sync_event(event);
226 }
227 }
228 event = self.litep2p.next_event() => {
229 if let Some(event) = event {
230 self.handle_litep2p_event(event);
231 }
232 }
233 Some(cmd) = self.cmd_rx.recv() => {
234 self.handle_command(cmd).await;
235 }
236 }
237 }
238 }
239
240 fn handle_notification_event(&mut self, event: NotificationEvent) {
241 match event {
242 NotificationEvent::ValidateSubstream { peer, .. } => {
243 self.notif_handle
244 .send_validation_result(peer, ValidationResult::Accept);
245 }
246 NotificationEvent::NotificationStreamOpened { peer, .. } => {
247 info!(peer = %peer, "notification stream opened");
248 }
249 NotificationEvent::NotificationStreamClosed { peer } => {
250 debug!(peer = %peer, "notification stream closed");
251 }
252 NotificationEvent::NotificationReceived { peer, notification } => {
253 let sender = self
254 .peer_map
255 .peer_to_validator
256 .get(&peer)
257 .copied()
258 .unwrap_or_default();
259 match serde_cbor_2::from_slice::<ConsensusMessage>(¬ification) {
260 Ok(msg) => {
261 let _ = self.msg_tx.send((sender, msg));
262 }
263 Err(e) => {
264 warn!(error = %e, "failed to decode notification");
265 }
266 }
267 }
268 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
269 warn!(peer = %peer, error = ?error, "notification stream open failed");
270 }
271 }
272 }
273
274 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
275 match event {
276 RequestResponseEvent::RequestReceived {
277 peer,
278 request_id,
279 request,
280 ..
281 } => {
282 let sender = self
283 .peer_map
284 .peer_to_validator
285 .get(&peer)
286 .copied()
287 .unwrap_or_default();
288 match serde_cbor_2::from_slice::<ConsensusMessage>(&request) {
289 Ok(msg) => {
290 let _ = self.msg_tx.send((sender, msg));
291 self.reqresp_handle.send_response(request_id, vec![]);
292 }
293 Err(e) => {
294 warn!(error = %e, "failed to decode request");
295 self.reqresp_handle.reject_request(request_id);
296 }
297 }
298 }
299 RequestResponseEvent::ResponseReceived { .. } => {}
300 RequestResponseEvent::RequestFailed { peer, error, .. } => {
301 debug!(peer = %peer, error = ?error, "request failed");
302 }
303 }
304 }
305
306 fn handle_sync_event(&mut self, event: RequestResponseEvent) {
307 match event {
308 RequestResponseEvent::RequestReceived {
309 peer,
310 request_id,
311 request,
312 ..
313 } => {
314 match serde_cbor_2::from_slice::<SyncRequest>(&request) {
316 Ok(req) => {
317 let _ = self.sync_req_tx.send(IncomingSyncRequest {
318 request_id,
319 peer,
320 request: req,
321 });
322 }
323 Err(e) => {
324 warn!(error = %e, "failed to decode sync request");
325 let err_resp = SyncResponse::Error(format!("decode error: {e}"));
326 if let Ok(bytes) = serde_cbor_2::to_vec(&err_resp) {
327 self.sync_handle.send_response(request_id, bytes);
328 } else {
329 self.sync_handle.reject_request(request_id);
330 }
331 }
332 }
333 }
334 RequestResponseEvent::ResponseReceived {
335 request_id: _,
336 response,
337 ..
338 } => {
339 match serde_cbor_2::from_slice::<SyncResponse>(&response) {
341 Ok(resp) => {
342 let _ = self.sync_resp_tx.send(resp);
343 }
344 Err(e) => {
345 warn!(error = %e, "failed to decode sync response");
346 }
347 }
348 }
349 RequestResponseEvent::RequestFailed { peer, error, .. } => {
350 debug!(peer = %peer, error = ?error, "sync request failed");
351 let _ = self
352 .sync_resp_tx
353 .send(SyncResponse::Error(format!("request failed: {error:?}")));
354 }
355 }
356 }
357
358 fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
359 match event {
360 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
361 info!(peer = %peer, endpoint = ?endpoint, "connection established");
362 }
363 Litep2pEvent::ConnectionClosed { peer, .. } => {
364 debug!(peer = %peer, "connection closed");
365 }
366 Litep2pEvent::DialFailure { address, error, .. } => {
367 warn!(address = %address, error = ?error, "dial failed");
368 }
369 _ => {}
370 }
371 }
372
373 fn update_peer_info(&self) {
374 let peers: Vec<PeerStatus> = self
375 .peer_map
376 .validator_to_peer
377 .iter()
378 .map(|(&vid, pid)| PeerStatus {
379 validator_id: vid,
380 peer_id: pid.to_string(),
381 })
382 .collect();
383 let _ = self.peer_info_tx.send(peers);
384 }
385
386 async fn handle_command(&mut self, cmd: NetCommand) {
387 match cmd {
388 NetCommand::Broadcast(bytes) => {
389 for &peer in self.peer_map.peer_to_validator.keys() {
390 let _ = self
391 .notif_handle
392 .send_sync_notification(peer, bytes.clone());
393 }
394 }
395 NetCommand::SendTo(target, bytes) => {
396 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
397 let _ = self
398 .reqresp_handle
399 .send_request(peer_id, bytes, DialOptions::Reject)
400 .await;
401 }
402 }
403 NetCommand::AddPeer(vid, pid, addrs) => {
404 info!(validator = %vid, peer = %pid, "adding peer");
405 self.peer_map.insert(vid, pid);
406 self.litep2p.add_known_address(pid, addrs.into_iter());
407 self.update_peer_info();
408 }
409 NetCommand::RemovePeer(vid) => {
410 if let Some(pid) = self.peer_map.remove(vid) {
411 info!(validator = %vid, peer = %pid, "removed peer");
412 } else {
413 warn!(validator = %vid, "peer not found for removal");
414 }
415 self.update_peer_info();
416 }
417 NetCommand::SyncRequest(peer_id, bytes) => {
418 let _ = self
419 .sync_handle
420 .send_request(peer_id, bytes, DialOptions::Reject)
421 .await;
422 }
423 NetCommand::SyncRespond(request_id, bytes) => {
424 self.sync_handle.send_response(request_id, bytes);
425 }
426 }
427 }
428}
429
430pub struct Litep2pNetworkSink {
433 cmd_tx: mpsc::UnboundedSender<NetCommand>,
434}
435
436impl Litep2pNetworkSink {
437 pub fn add_peer(&self, vid: ValidatorId, pid: PeerId, addrs: Vec<Multiaddr>) {
438 let _ = self.cmd_tx.send(NetCommand::AddPeer(vid, pid, addrs));
439 }
440
441 pub fn remove_peer(&self, vid: ValidatorId) {
442 let _ = self.cmd_tx.send(NetCommand::RemovePeer(vid));
443 }
444
445 pub fn send_sync_request(&self, peer_id: PeerId, request: &SyncRequest) {
446 if let Ok(bytes) = serde_cbor_2::to_vec(request) {
447 let _ = self.cmd_tx.send(NetCommand::SyncRequest(peer_id, bytes));
448 }
449 }
450
451 pub fn send_sync_response(&self, request_id: RequestId, response: &SyncResponse) {
452 if let Ok(bytes) = serde_cbor_2::to_vec(response) {
453 let _ = self.cmd_tx.send(NetCommand::SyncRespond(request_id, bytes));
454 }
455 }
456}
457
458impl NetworkSink for Litep2pNetworkSink {
459 fn broadcast(&self, msg: ConsensusMessage) {
460 if let Ok(bytes) = serde_cbor_2::to_vec(&msg) {
461 let _ = self.cmd_tx.send(NetCommand::Broadcast(bytes));
462 }
463 }
464
465 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
466 if let Ok(bytes) = serde_cbor_2::to_vec(&msg) {
467 let _ = self.cmd_tx.send(NetCommand::SendTo(target, bytes));
468 }
469 }
470}