1use crate::{
2 behaviours::{BlueprintBehaviour, BlueprintBehaviourConfig, BlueprintBehaviourEvent},
3 blueprint_protocol::{BlueprintProtocolEvent, InstanceMessageRequest, InstanceMessageResponse},
4 discovery::{
5 PeerInfo, PeerManager,
6 behaviour::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent},
7 },
8 error::Error,
9 service_handle::NetworkServiceHandle,
10 types::ProtocolMessage,
11};
12use alloy_primitives::Address;
13use blueprint_core::{debug, info, trace, warn};
14use blueprint_crypto::KeyType;
15use blueprint_std::{fmt::Display, sync::Arc, time::Duration};
16use crossbeam_channel::{self, Receiver, SendError, Sender};
17use futures::StreamExt;
18use libp2p::{
19 Multiaddr, PeerId, Swarm, SwarmBuilder, identify,
20 identity::Keypair,
21 kad, mdns, ping,
22 swarm::{SwarmEvent, dial_opts::DialOpts},
23};
24use std::collections::HashSet;
25
26pub enum AllowedKeys<K: KeyType> {
27 EvmAddresses(HashSet<Address>),
28 InstancePublicKeys(HashSet<K::Public>),
29}
30
31impl<K: KeyType> Default for AllowedKeys<K> {
32 fn default() -> Self {
33 Self::InstancePublicKeys(HashSet::new())
34 }
35}
36
37#[derive(Debug)]
39pub enum NetworkEvent<K: KeyType> {
40 InstanceRequestInbound {
42 peer: PeerId,
43 request: InstanceMessageRequest<K>,
44 },
45 InstanceResponseInbound {
47 peer: PeerId,
48 response: InstanceMessageResponse<K>,
49 },
50 InstanceRequestOutbound {
52 peer: PeerId,
53 request: InstanceMessageRequest<K>,
54 },
55 InstanceResponseOutbound {
57 peer: PeerId,
58 response: InstanceMessageResponse<K>,
59 },
60 GossipReceived {
62 source: PeerId,
63 topic: String,
64 message: Vec<u8>,
65 },
66 GossipSent { topic: String, message: Vec<u8> },
68 PeerConnected(PeerId),
70 PeerDisconnected(PeerId),
72 HandshakeCompleted { peer: PeerId },
74 HandshakeFailed { peer: PeerId, reason: String },
76}
77
78#[derive(Debug)]
79pub enum NetworkEventSendError<K: KeyType> {
80 PeerConnected(PeerId),
81 PeerDisconnected(PeerId),
82 HandshakeCompleted {
83 peer: PeerId,
84 },
85 HandshakeFailed {
86 peer: PeerId,
87 reason: String,
88 },
89 InstanceRequestInbound {
90 peer: PeerId,
91 request: InstanceMessageRequest<K>,
92 },
93 InstanceResponseInbound {
94 peer: PeerId,
95 response: InstanceMessageResponse<K>,
96 },
97 InstanceRequestOutbound {
98 peer: PeerId,
99 request: InstanceMessageRequest<K>,
100 },
101 InstanceResponseOutbound {
102 peer: PeerId,
103 response: InstanceMessageResponse<K>,
104 },
105 GossipReceived {
106 source: PeerId,
107 topic: String,
108 message: Vec<u8>,
109 },
110 GossipSent {
111 topic: String,
112 message: Vec<u8>,
113 },
114}
115
116impl<K: KeyType> Display for NetworkEventSendError<K> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 match self {
119 NetworkEventSendError::PeerConnected(peer) => {
120 write!(f, "Error sending Peer connected event: {}", peer)
121 }
122 NetworkEventSendError::PeerDisconnected(peer) => {
123 write!(f, "Error sending Peer disconnected event: {}", peer)
124 }
125 NetworkEventSendError::HandshakeCompleted { peer } => {
126 write!(f, "Error sending Handshake completed event: {}", peer)
127 }
128 NetworkEventSendError::HandshakeFailed { peer, reason } => {
129 write!(
130 f,
131 "Error sending Handshake failed event: {} ({})",
132 peer, reason
133 )
134 }
135 NetworkEventSendError::InstanceRequestInbound { peer, request } => {
136 write!(
137 f,
138 "Error sending Instance request inbound event: {} ({:#?})",
139 peer, request
140 )
141 }
142 NetworkEventSendError::InstanceResponseInbound { peer, response } => {
143 write!(
144 f,
145 "Error sending Instance response inbound event: {} ({:#?})",
146 peer, response
147 )
148 }
149 NetworkEventSendError::InstanceRequestOutbound { peer, request } => {
150 write!(
151 f,
152 "Error sending Instance request outbound event: {} ({:#?})",
153 peer, request
154 )
155 }
156 NetworkEventSendError::InstanceResponseOutbound { peer, response } => {
157 write!(
158 f,
159 "Error sending Instance response outbound event: {} ({:#?})",
160 peer, response
161 )
162 }
163 NetworkEventSendError::GossipReceived {
164 source,
165 topic,
166 message,
167 } => {
168 write!(
169 f,
170 "Error sending Gossip received event on topic: {} from source: {} ({:#?})",
171 topic, source, message
172 )
173 }
174 NetworkEventSendError::GossipSent { topic, message } => {
175 write!(
176 f,
177 "Error sending Gossip sent event on topic: {} ({:#?})",
178 topic, message
179 )
180 }
181 }
182 }
183}
184
185#[derive(Debug)]
187pub enum NetworkCommandMessage<K: KeyType> {
188 InstanceRequest {
189 peer: PeerId,
190 request: InstanceMessageRequest<K>,
191 },
192 GossipMessage {
193 source: PeerId,
194 topic: String,
195 message: Vec<u8>,
196 },
197 SubscribeToTopic(String),
198 UnsubscribeFromTopic(String),
199}
200
201#[derive(Debug, Clone)]
203pub struct NetworkConfig<K: KeyType> {
204 pub network_name: String,
206 pub instance_id: String,
208 pub instance_key_pair: K::Secret,
210 pub local_key: Keypair,
212 pub listen_addr: Multiaddr,
214 pub target_peer_count: u32,
216 pub bootstrap_peers: Vec<Multiaddr>,
218 pub enable_mdns: bool,
220 pub enable_kademlia: bool,
222 pub using_evm_address_for_handshake_verification: bool,
224}
225
226pub struct NetworkService<K: KeyType> {
227 swarm: Swarm<BlueprintBehaviour<K>>,
229 local_signing_key: K::Secret,
231 pub(crate) peer_manager: Arc<PeerManager<K>>,
233 network_sender: Sender<NetworkCommandMessage<K>>,
235 network_receiver: Receiver<NetworkCommandMessage<K>>,
237 protocol_message_receiver: Receiver<ProtocolMessage>,
239 event_sender: Sender<NetworkEvent<K>>,
241 #[expect(dead_code)] event_receiver: Receiver<NetworkEvent<K>>,
244 bootstrap_peers: HashSet<Multiaddr>,
246 allowed_keys_rx: Receiver<AllowedKeys<K>>,
248}
249
250impl<K: KeyType> NetworkService<K> {
251 #[allow(clippy::missing_panics_doc)] pub fn new(
259 config: NetworkConfig<K>,
260 allowed_keys: AllowedKeys<K>,
261 allowed_keys_rx: Receiver<AllowedKeys<K>>,
262 ) -> Result<Self, Error> {
263 let NetworkConfig::<K> {
264 network_name,
265 instance_id,
266 instance_key_pair,
267 local_key,
268 listen_addr,
269 target_peer_count,
270 bootstrap_peers,
271 enable_mdns: _,
272 enable_kademlia: _,
273 using_evm_address_for_handshake_verification,
274 ..
275 } = config;
276
277 let peer_manager = Arc::new(PeerManager::new(allowed_keys));
278 let blueprint_protocol_name = format!("/{network_name}/{instance_id}");
279
280 let (network_sender, network_receiver) = crossbeam_channel::unbounded();
281 let (protocol_message_sender, protocol_message_receiver) = crossbeam_channel::unbounded();
282 let (event_sender, event_receiver) = crossbeam_channel::unbounded();
283
284 let blueprint_behaviour_config = BlueprintBehaviourConfig {
286 network_name,
287 blueprint_protocol_name: blueprint_protocol_name.clone(),
288 local_key: local_key.clone(),
289 instance_key_pair: instance_key_pair.clone(),
290 target_peer_count,
291 peer_manager: peer_manager.clone(),
292 protocol_message_sender,
293 using_evm_address_for_handshake_verification,
294 };
295 let behaviour = BlueprintBehaviour::new(blueprint_behaviour_config)?;
296
297 let mut swarm = SwarmBuilder::with_existing_identity(local_key)
298 .with_tokio()
299 .with_tcp(
300 libp2p::tcp::Config::default().nodelay(true),
301 libp2p::noise::Config::new,
302 libp2p::yamux::Config::default,
303 )?
304 .with_quic_config(|mut config| {
305 config.handshake_timeout = Duration::from_secs(30);
306 config
307 })
308 .with_dns()?
309 .with_behaviour(|_| behaviour)
310 .unwrap()
311 .build();
312
313 swarm
314 .behaviour_mut()
315 .blueprint_protocol
316 .subscribe(&blueprint_protocol_name)?;
317
318 swarm.listen_on(listen_addr)?;
320 let bootstrap_peers = bootstrap_peers.into_iter().collect();
321
322 Ok(Self {
323 swarm,
324 local_signing_key: instance_key_pair,
325 peer_manager,
326 network_sender,
327 network_receiver,
328 protocol_message_receiver,
329 event_sender,
330 event_receiver,
331 bootstrap_peers,
332 allowed_keys_rx,
333 })
334 }
335
336 pub fn network_sender(&self) -> Sender<NetworkCommandMessage<K>> {
338 self.network_sender.clone()
339 }
340
341 pub fn start(self) -> NetworkServiceHandle<K> {
342 let local_peer_id = *self.swarm.local_peer_id();
343 let network_sender = self.network_sender.clone();
344 let protocol_message_receiver = self.protocol_message_receiver.clone();
345
346 let handle = NetworkServiceHandle::new(
348 local_peer_id,
349 self.swarm
350 .behaviour()
351 .blueprint_protocol
352 .blueprint_protocol_name
353 .clone(),
354 self.local_signing_key.clone(),
355 self.peer_manager.clone(),
356 network_sender,
357 protocol_message_receiver,
358 );
359
360 let mut info = PeerInfo::default();
362 for addr in self.swarm.listeners() {
363 info.addresses.insert(addr.clone());
364 }
365 self.peer_manager.update_peer(local_peer_id, info);
366
367 let peer_manager = self.peer_manager.clone();
369 let allowed_keys_rx = self.allowed_keys_rx.clone();
370 tokio::spawn(async move {
371 peer_manager.run_allowed_keys_updater(&allowed_keys_rx);
372 });
373
374 tokio::spawn(async move {
376 Box::pin(self.run()).await;
377 });
378
379 handle
380 }
381
382 async fn run(mut self) {
384 info!("Starting network service");
385
386 if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
388 warn!("Failed to bootstrap with Kademlia: {}", e);
389 }
390
391 for addr in &self.bootstrap_peers {
393 debug!("Dialing bootstrap peer at {}", addr);
394 if let Err(e) = self.swarm.dial(addr.clone()) {
395 warn!("Failed to dial bootstrap peer: {}", e);
396 }
397 }
398
399 let mut last_handshake_retry = tokio::time::Instant::now();
401 const HANDSHAKE_RETRY_INTERVAL: Duration = Duration::from_secs(3);
403
404 loop {
405 let now = tokio::time::Instant::now();
407 if now.duration_since(last_handshake_retry) >= HANDSHAKE_RETRY_INTERVAL {
408 self.retry_unverified_handshakes();
409 last_handshake_retry = now;
410 }
411
412 tokio::select! {
413 swarm_event = self.swarm.select_next_some() => {
414 match swarm_event {
415 SwarmEvent::NewListenAddr { address, .. } => {
416 info!("New listen address: {}", address);
417 let local_peer_id = *self.swarm.local_peer_id();
418 let mut info = self.peer_manager.get_peer_info(&local_peer_id)
419 .unwrap_or_default();
420 info.addresses.insert(address.clone());
421 self.peer_manager.update_peer(local_peer_id, info);
422 },
423 SwarmEvent::Behaviour(event) => {
424 if let Err(e) = handle_behaviour_event(
425 &mut self.swarm,
426 &self.peer_manager,
427 event,
428 &self.event_sender,
429 )
430 {
431 warn!("Failed to handle swarm event: {}", e);
432 }
433 },
434 _ => {}
435 }
436 }
437 Ok(msg) = async { self.network_receiver.try_recv() } => {
438 if let Err(e) = handle_network_message(
439 &mut self.swarm,
440 msg,
441 &self.peer_manager,
442 &self.event_sender,
443 )
444 {
445 warn!("Failed to handle network message: {}", e);
446 }
447 }
448 () = tokio::time::sleep(Duration::from_millis(100)) => {}
450 else => break,
451 }
452 }
453
454 info!("Network service stopped");
455 }
456
457 fn retry_unverified_handshakes(&mut self) {
459 let connected_peers = self.swarm.behaviour().discovery.get_peers().clone();
460 for peer_id in connected_peers {
461 if self.peer_manager.is_peer_verified(&peer_id) || self.peer_manager.is_banned(&peer_id)
463 {
464 continue;
465 }
466
467 debug!("Retrying handshake with unverified peer: {}", peer_id);
468 if let Err(e) = self
469 .swarm
470 .behaviour_mut()
471 .blueprint_protocol
472 .send_handshake(&peer_id)
473 {
474 debug!("Failed to retry handshake with peer {}: {:?}", peer_id, e);
475 }
476 }
477 }
478
479 pub fn get_listen_addr(&self) -> Option<Multiaddr> {
481 self.swarm.listeners().next().cloned()
482 }
483}
484
485fn handle_behaviour_event<K: KeyType>(
487 swarm: &mut Swarm<BlueprintBehaviour<K>>,
488 peer_manager: &Arc<PeerManager<K>>,
489 event: BlueprintBehaviourEvent<K>,
490 event_sender: &Sender<NetworkEvent<K>>,
491) -> Result<(), Error> {
492 match event {
493 BlueprintBehaviourEvent::ConnectionLimits(_) => {}
494 BlueprintBehaviourEvent::Discovery(discovery_event) => {
495 handle_discovery_event(swarm, peer_manager, discovery_event, event_sender)?;
496 }
497 BlueprintBehaviourEvent::BlueprintProtocol(blueprint_event) => {
498 handle_blueprint_protocol_event(swarm, peer_manager, blueprint_event, event_sender)?;
499 }
500 BlueprintBehaviourEvent::Ping(ping_event) => {
501 handle_ping_event(swarm, peer_manager, ping_event, event_sender)?;
502 }
503 }
504
505 Ok(())
506}
507
508fn handle_discovery_event<K: KeyType>(
510 swarm: &mut Swarm<BlueprintBehaviour<K>>,
511 peer_manager: &Arc<PeerManager<K>>,
512 event: DiscoveryEvent,
513 event_sender: &Sender<NetworkEvent<K>>,
514) -> Result<(), Error> {
515 match event {
516 DiscoveryEvent::PeerConnected(peer_id) => {
517 info!("Peer connected, {peer_id}");
518 if let Some(info) = swarm.behaviour().discovery.peer_info.get(&peer_id) {
520 peer_manager.update_peer(peer_id, info.clone());
521 }
522 event_sender
523 .send(NetworkEvent::PeerConnected(peer_id))
524 .map_err(|_| {
525 SendError(NetworkEventSendError::<K>::PeerConnected(peer_id).to_string())
526 })?;
527 }
528 DiscoveryEvent::PeerDisconnected(peer_id) => {
529 info!("Peer disconnected, {peer_id}");
530 peer_manager.remove_peer(&peer_id, "disconnected");
531 event_sender
532 .send(NetworkEvent::PeerDisconnected(peer_id))
533 .map_err(|_| {
534 SendError(NetworkEventSendError::<K>::PeerDisconnected(peer_id).to_string())
535 })?;
536 }
537 DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
538 DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
539 peer_id,
540 info,
541 ..
542 }) => {
543 info!(%peer_id, "Received identify event");
544 let protocols: HashSet<String> = info
545 .protocols
546 .iter()
547 .map(std::string::ToString::to_string)
548 .collect();
549
550 trace!(%peer_id, ?protocols, "Supported protocols");
551
552 let blueprint_protocol_name =
553 &swarm.behaviour().blueprint_protocol.blueprint_protocol_name;
554 if !protocols.contains(blueprint_protocol_name) {
555 warn!(%peer_id, %blueprint_protocol_name, "Peer does not support required protocol");
556 peer_manager.ban_peer_with_default_duration(*peer_id, "protocol unsupported");
557 return Ok(());
558 }
559
560 let mut peer_info = peer_manager.get_peer_info(peer_id).unwrap_or_default();
562
563 peer_info.identify_info = Some(info.clone());
565
566 trace!(%peer_id, listen_addrs=?info.listen_addrs, "Adding identify addresses");
567 for addr in &info.listen_addrs {
569 peer_info.addresses.insert(addr.clone());
570 }
571
572 trace!(%peer_id, "Updating peer info with identify information");
573 peer_manager.update_peer(*peer_id, peer_info);
574 debug!(%peer_id, "Successfully processed identify information");
575 }
576 DerivedDiscoveryBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed {
577 result: kad::QueryResult::GetClosestPeers(Ok(ok)),
578 ..
579 }) => {
580 for peer_info in &ok.peers {
582 if !peer_manager.get_peers().contains_key(&peer_info.peer_id) {
583 info!(%peer_info.peer_id, "Newly discovered peer from Kademlia");
584 let info = PeerInfo::default();
585 peer_manager.update_peer(peer_info.peer_id, info);
586 let addrs: Vec<_> = peer_info.addrs.clone();
587 for addr in addrs {
588 debug!(%peer_info.peer_id, %addr, "Dialing peer from Kademlia");
589 if let Err(e) = swarm.dial(DialOpts::from(addr)) {
590 warn!("Failed to dial address: {}", e);
591 }
592 }
593 }
594 }
595 }
596 DerivedDiscoveryBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
597 for (peer_id, addr) in list {
599 if !peer_manager.get_peers().contains_key(peer_id) {
600 info!(%peer_id, %addr, "Newly discovered peer from Mdns");
601 let mut info = PeerInfo::default();
602 info.addresses.insert(addr.clone());
603 peer_manager.update_peer(*peer_id, info);
604 debug!(%peer_id, %addr, "Dialing peer from Mdns");
605 if let Err(e) = swarm.dial(DialOpts::from(addr.clone())) {
606 warn!("Failed to dial address: {}", e);
607 }
608 }
609 }
610 }
611 _ => {}
612 },
613 }
614
615 Ok(())
616}
617
618fn handle_blueprint_protocol_event<K: KeyType>(
620 _swarm: &mut Swarm<BlueprintBehaviour<K>>,
621 _peer_manager: &Arc<PeerManager<K>>,
622 event: BlueprintProtocolEvent<K>,
623 event_sender: &Sender<NetworkEvent<K>>,
624) -> Result<(), Error> {
625 match event {
626 BlueprintProtocolEvent::Request {
627 peer,
628 request,
629 channel: _,
630 } => event_sender
631 .send(NetworkEvent::InstanceRequestInbound {
632 peer,
633 request: request.clone(),
634 })
635 .map_err(|_| {
636 SendError(
637 NetworkEventSendError::<K>::InstanceRequestInbound { peer, request }
638 .to_string(),
639 )
640 })?,
641 BlueprintProtocolEvent::Response {
642 peer,
643 response,
644 request_id: _,
645 } => event_sender
646 .send(NetworkEvent::InstanceResponseInbound {
647 peer,
648 response: response.clone(),
649 })
650 .map_err(|_| {
651 SendError(
652 NetworkEventSendError::<K>::InstanceResponseInbound { peer, response }
653 .to_string(),
654 )
655 })?,
656 BlueprintProtocolEvent::GossipMessage {
657 source,
658 topic,
659 message,
660 } => event_sender
661 .send(NetworkEvent::GossipReceived {
662 source,
663 topic: topic.to_string(),
664 message: message.clone(),
665 })
666 .map_err(|_| {
667 SendError(
668 NetworkEventSendError::<K>::GossipReceived {
669 source,
670 topic: topic.to_string(),
671 message,
672 }
673 .to_string(),
674 )
675 })?,
676 }
677
678 Ok(())
679}
680
681#[expect(clippy::unnecessary_wraps)]
683fn handle_ping_event<K: KeyType>(
684 _swarm: &mut Swarm<BlueprintBehaviour<K>>,
685 _peer_manager: &Arc<PeerManager<K>>,
686 event: ping::Event,
687 _event_sender: &Sender<NetworkEvent<K>>,
688) -> Result<(), Error> {
689 match event.result {
690 Ok(rtt) => {
691 trace!(
692 "PingSuccess::Ping rtt to {} is {} ms",
693 event.peer,
694 rtt.as_millis()
695 );
696 }
697 Err(ping::Failure::Unsupported) => {
698 debug!(peer=%event.peer, "Ping protocol unsupported");
699 }
700 Err(ping::Failure::Timeout) => {
701 debug!("Ping timeout: {}", event.peer);
702 }
703 Err(ping::Failure::Other { error }) => {
704 debug!("Ping failure: {error}");
705 }
706 }
707
708 Ok(())
709}
710
711fn handle_network_message<K: KeyType>(
713 swarm: &mut Swarm<BlueprintBehaviour<K>>,
714 msg: NetworkCommandMessage<K>,
715 peer_manager: &Arc<PeerManager<K>>,
716 event_sender: &Sender<NetworkEvent<K>>,
717) -> Result<(), Error> {
718 match msg {
719 NetworkCommandMessage::InstanceRequest { peer, request } => {
720 if !peer_manager.is_peer_verified(&peer) {
722 warn!(%peer, "Attempted to send request to unverified peer");
723 return Ok(());
724 }
725
726 debug!(%peer, ?request, "Sending instance request");
727 swarm
728 .behaviour_mut()
729 .blueprint_protocol
730 .send_request(&peer, request.clone());
731 event_sender
732 .send(NetworkEvent::InstanceRequestOutbound {
733 peer,
734 request: request.clone(),
735 })
736 .map_err(|_| {
737 SendError(
738 NetworkEventSendError::<K>::InstanceRequestOutbound { peer, request }
739 .to_string(),
740 )
741 })?;
742 }
743 NetworkCommandMessage::GossipMessage {
744 source,
745 topic,
746 message,
747 } => {
748 debug!(%source, %topic, "Publishing gossip message");
749 if let Err(e) = swarm
750 .behaviour_mut()
751 .blueprint_protocol
752 .publish(&topic, message.clone())
753 {
754 warn!(%source, %topic, "Failed to publish gossip message: {:?}", e);
755 return Ok(());
756 }
757 event_sender
758 .send(NetworkEvent::GossipSent {
759 topic: topic.to_string(),
760 message: message.clone(),
761 })
762 .map_err(|_| {
763 SendError(NetworkEventSendError::<K>::GossipSent { topic, message }.to_string())
764 })?;
765 }
766 NetworkCommandMessage::SubscribeToTopic(topic) => {
767 swarm.behaviour_mut().blueprint_protocol.subscribe(&topic)?;
768 }
769 NetworkCommandMessage::UnsubscribeFromTopic(topic) => {
770 swarm.behaviour_mut().blueprint_protocol.unsubscribe(&topic);
771 }
772 }
773
774 Ok(())
775}