1use ruc::*;
2
3use std::collections::HashMap;
4
5use futures::StreamExt;
6use hotmint_consensus::network::NetworkSink;
7use hotmint_types::{ConsensusMessage, ValidatorId};
8use litep2p::config::ConfigBuilder;
9use litep2p::protocol::notification::{
10 ConfigBuilder as NotifConfigBuilder, NotificationEvent, NotificationHandle, ValidationResult,
11};
12use litep2p::protocol::request_response::{
13 ConfigBuilder as ReqRespConfigBuilder, DialOptions, RequestResponseEvent, RequestResponseHandle,
14};
15use litep2p::transport::tcp::config::Config as TcpConfig;
16use litep2p::types::multiaddr::Multiaddr;
17use litep2p::{Litep2p, Litep2pEvent, PeerId};
18use tokio::sync::mpsc;
19use tracing::{debug, info, warn};
20
21const NOTIF_PROTOCOL: &str = "/hotmint/consensus/notif/1";
22const REQ_RESP_PROTOCOL: &str = "/hotmint/consensus/reqresp/1";
23const MAX_NOTIFICATION_SIZE: usize = 16 * 1024 * 1024;
24
25#[derive(Clone)]
27pub struct PeerMap {
28 pub validator_to_peer: HashMap<ValidatorId, PeerId>,
29 pub peer_to_validator: HashMap<PeerId, ValidatorId>,
30}
31
32impl PeerMap {
33 pub fn new() -> Self {
34 Self {
35 validator_to_peer: HashMap::new(),
36 peer_to_validator: HashMap::new(),
37 }
38 }
39
40 pub fn insert(&mut self, vid: ValidatorId, pid: PeerId) {
41 self.validator_to_peer.insert(vid, pid);
42 self.peer_to_validator.insert(pid, vid);
43 }
44}
45
46impl Default for PeerMap {
47 fn default() -> Self {
48 Self::new()
49 }
50}
51
52enum NetCommand {
54 Broadcast(Vec<u8>),
55 SendTo(ValidatorId, Vec<u8>),
56}
57
58pub struct NetworkService {
60 litep2p: Litep2p,
61 notif_handle: NotificationHandle,
62 reqresp_handle: RequestResponseHandle,
63 peer_map: PeerMap,
64 msg_tx: mpsc::UnboundedSender<(ValidatorId, ConsensusMessage)>,
65 cmd_rx: mpsc::UnboundedReceiver<NetCommand>,
66}
67
68impl NetworkService {
69 #[allow(clippy::type_complexity)]
72 pub fn create(
73 listen_addr: Multiaddr,
74 peer_map: PeerMap,
75 known_addresses: Vec<(PeerId, Vec<Multiaddr>)>,
76 ) -> Result<(
77 Self,
78 Litep2pNetworkSink,
79 mpsc::UnboundedReceiver<(ValidatorId, ConsensusMessage)>,
80 )> {
81 let (notif_config, notif_handle) = NotifConfigBuilder::new(NOTIF_PROTOCOL.into())
82 .with_max_size(MAX_NOTIFICATION_SIZE)
83 .with_auto_accept_inbound(true)
84 .with_sync_channel_size(1024)
85 .with_async_channel_size(1024)
86 .build();
87
88 let (reqresp_config, reqresp_handle) = ReqRespConfigBuilder::new(REQ_RESP_PROTOCOL.into())
89 .with_max_size(MAX_NOTIFICATION_SIZE)
90 .build();
91
92 let mut config_builder = ConfigBuilder::new()
93 .with_tcp(TcpConfig {
94 listen_addresses: vec![listen_addr],
95 ..Default::default()
96 })
97 .with_notification_protocol(notif_config)
98 .with_request_response_protocol(reqresp_config);
99
100 if !known_addresses.is_empty() {
101 config_builder = config_builder.with_known_addresses(known_addresses.into_iter());
102 }
103
104 let litep2p =
105 Litep2p::new(config_builder.build()).c(d!("failed to create litep2p instance"))?;
106
107 info!(peer_id = %litep2p.local_peer_id(), "litep2p started");
108 for addr in litep2p.listen_addresses() {
109 info!(address = %addr, "listening on");
110 }
111
112 let (msg_tx, msg_rx) = mpsc::unbounded_channel();
113 let (cmd_tx, cmd_rx) = mpsc::unbounded_channel();
114
115 let sink = Litep2pNetworkSink { cmd_tx };
116
117 Ok((
118 Self {
119 litep2p,
120 notif_handle,
121 reqresp_handle,
122 peer_map,
123 msg_tx,
124 cmd_rx,
125 },
126 sink,
127 msg_rx,
128 ))
129 }
130
131 pub fn local_peer_id(&self) -> &PeerId {
132 self.litep2p.local_peer_id()
133 }
134
135 pub async fn run(mut self) {
137 loop {
138 tokio::select! {
139 event = self.notif_handle.next() => {
140 if let Some(event) = event {
141 self.handle_notification_event(event);
142 }
143 }
144 event = self.reqresp_handle.next() => {
145 if let Some(event) = event {
146 self.handle_reqresp_event(event);
147 }
148 }
149 event = self.litep2p.next_event() => {
150 if let Some(event) = event {
151 self.handle_litep2p_event(event);
152 }
153 }
154 Some(cmd) = self.cmd_rx.recv() => {
155 self.handle_command(cmd).await;
156 }
157 }
158 }
159 }
160
161 fn handle_notification_event(&mut self, event: NotificationEvent) {
162 match event {
163 NotificationEvent::ValidateSubstream { peer, .. } => {
164 self.notif_handle
165 .send_validation_result(peer, ValidationResult::Accept);
166 }
167 NotificationEvent::NotificationStreamOpened { peer, .. } => {
168 info!(peer = %peer, "notification stream opened");
169 }
170 NotificationEvent::NotificationStreamClosed { peer } => {
171 debug!(peer = %peer, "notification stream closed");
172 }
173 NotificationEvent::NotificationReceived { peer, notification } => {
174 let sender = self
175 .peer_map
176 .peer_to_validator
177 .get(&peer)
178 .copied()
179 .unwrap_or_default();
180 match rmp_serde::from_slice::<ConsensusMessage>(¬ification) {
181 Ok(msg) => {
182 let _ = self.msg_tx.send((sender, msg));
183 }
184 Err(e) => {
185 warn!(error = %e, "failed to decode notification");
186 }
187 }
188 }
189 NotificationEvent::NotificationStreamOpenFailure { peer, error } => {
190 warn!(peer = %peer, error = ?error, "notification stream open failed");
191 }
192 }
193 }
194
195 fn handle_reqresp_event(&mut self, event: RequestResponseEvent) {
196 match event {
197 RequestResponseEvent::RequestReceived {
198 peer,
199 request_id,
200 request,
201 ..
202 } => {
203 let sender = self
204 .peer_map
205 .peer_to_validator
206 .get(&peer)
207 .copied()
208 .unwrap_or_default();
209 match rmp_serde::from_slice::<ConsensusMessage>(&request) {
210 Ok(msg) => {
211 let _ = self.msg_tx.send((sender, msg));
212 self.reqresp_handle.send_response(request_id, vec![]);
213 }
214 Err(e) => {
215 warn!(error = %e, "failed to decode request");
216 self.reqresp_handle.reject_request(request_id);
217 }
218 }
219 }
220 RequestResponseEvent::ResponseReceived { .. } => {}
221 RequestResponseEvent::RequestFailed { peer, error, .. } => {
222 debug!(peer = %peer, error = ?error, "request failed");
223 }
224 }
225 }
226
227 fn handle_litep2p_event(&mut self, event: Litep2pEvent) {
228 match event {
229 Litep2pEvent::ConnectionEstablished { peer, endpoint } => {
230 info!(peer = %peer, endpoint = ?endpoint, "connection established");
231 }
232 Litep2pEvent::ConnectionClosed { peer, .. } => {
233 debug!(peer = %peer, "connection closed");
234 }
235 Litep2pEvent::DialFailure { address, error, .. } => {
236 warn!(address = %address, error = ?error, "dial failed");
237 }
238 _ => {}
239 }
240 }
241
242 async fn handle_command(&mut self, cmd: NetCommand) {
243 match cmd {
244 NetCommand::Broadcast(bytes) => {
245 for &peer in self.peer_map.peer_to_validator.keys() {
247 let _ = self
248 .notif_handle
249 .send_sync_notification(peer, bytes.clone());
250 }
251 }
252 NetCommand::SendTo(target, bytes) => {
253 if let Some(&peer_id) = self.peer_map.validator_to_peer.get(&target) {
254 let _ = self
255 .reqresp_handle
256 .send_request(peer_id, bytes, DialOptions::Reject)
257 .await;
258 }
259 }
260 }
261 }
262}
263
264pub struct Litep2pNetworkSink {
267 cmd_tx: mpsc::UnboundedSender<NetCommand>,
268}
269
270impl NetworkSink for Litep2pNetworkSink {
271 fn broadcast(&self, msg: ConsensusMessage) {
272 if let Ok(bytes) = rmp_serde::to_vec(&msg) {
273 let _ = self.cmd_tx.send(NetCommand::Broadcast(bytes));
274 }
275 }
276
277 fn send_to(&self, target: ValidatorId, msg: ConsensusMessage) {
278 if let Ok(bytes) = rmp_serde::to_vec(&msg) {
279 let _ = self.cmd_tx.send(NetCommand::SendTo(target, bytes));
280 }
281 }
282}