blueprint_networking/
service.rs

1use crate::{
2    behaviours::{BlueprintBehaviour, BlueprintBehaviourConfig, BlueprintBehaviourEvent},
3    blueprint_protocol::{BlueprintProtocolEvent, InstanceMessageRequest, InstanceMessageResponse},
4    discovery::{
5        PeerInfo, PeerManager,
6        behaviour::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent},
7    },
8    error::Error,
9    service_handle::NetworkServiceHandle,
10    types::ProtocolMessage,
11};
12use alloy_primitives::Address;
13use blueprint_core::{debug, info, trace, warn};
14use blueprint_crypto::KeyType;
15use blueprint_std::{fmt::Display, sync::Arc, time::Duration};
16use crossbeam_channel::{self, Receiver, SendError, Sender};
17use futures::StreamExt;
18use libp2p::{
19    Multiaddr, PeerId, Swarm, SwarmBuilder, identify,
20    identity::Keypair,
21    kad, mdns, ping,
22    swarm::{SwarmEvent, dial_opts::DialOpts},
23};
24use std::collections::HashSet;
25
26pub enum AllowedKeys<K: KeyType> {
27    EvmAddresses(HashSet<Address>),
28    InstancePublicKeys(HashSet<K::Public>),
29}
30
31impl<K: KeyType> Default for AllowedKeys<K> {
32    fn default() -> Self {
33        Self::InstancePublicKeys(HashSet::new())
34    }
35}
36
37/// Events emitted by the network service
38#[derive(Debug)]
39pub enum NetworkEvent<K: KeyType> {
40    /// New request received from a peer
41    InstanceRequestInbound {
42        peer: PeerId,
43        request: InstanceMessageRequest<K>,
44    },
45    /// New response received from a peer
46    InstanceResponseInbound {
47        peer: PeerId,
48        response: InstanceMessageResponse<K>,
49    },
50    /// New request sent to a peer
51    InstanceRequestOutbound {
52        peer: PeerId,
53        request: InstanceMessageRequest<K>,
54    },
55    /// Response sent to a peer
56    InstanceResponseOutbound {
57        peer: PeerId,
58        response: InstanceMessageResponse<K>,
59    },
60    /// New gossip message received
61    GossipReceived {
62        source: PeerId,
63        topic: String,
64        message: Vec<u8>,
65    },
66    /// New gossip message sent
67    GossipSent { topic: String, message: Vec<u8> },
68    /// Peer connected
69    PeerConnected(PeerId),
70    /// Peer disconnected
71    PeerDisconnected(PeerId),
72    /// Handshake completed successfully
73    HandshakeCompleted { peer: PeerId },
74    /// Handshake failed
75    HandshakeFailed { peer: PeerId, reason: String },
76}
77
78#[derive(Debug)]
79pub enum NetworkEventSendError<K: KeyType> {
80    PeerConnected(PeerId),
81    PeerDisconnected(PeerId),
82    HandshakeCompleted {
83        peer: PeerId,
84    },
85    HandshakeFailed {
86        peer: PeerId,
87        reason: String,
88    },
89    InstanceRequestInbound {
90        peer: PeerId,
91        request: InstanceMessageRequest<K>,
92    },
93    InstanceResponseInbound {
94        peer: PeerId,
95        response: InstanceMessageResponse<K>,
96    },
97    InstanceRequestOutbound {
98        peer: PeerId,
99        request: InstanceMessageRequest<K>,
100    },
101    InstanceResponseOutbound {
102        peer: PeerId,
103        response: InstanceMessageResponse<K>,
104    },
105    GossipReceived {
106        source: PeerId,
107        topic: String,
108        message: Vec<u8>,
109    },
110    GossipSent {
111        topic: String,
112        message: Vec<u8>,
113    },
114}
115
116impl<K: KeyType> Display for NetworkEventSendError<K> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        match self {
119            NetworkEventSendError::PeerConnected(peer) => {
120                write!(f, "Error sending Peer connected event: {}", peer)
121            }
122            NetworkEventSendError::PeerDisconnected(peer) => {
123                write!(f, "Error sending Peer disconnected event: {}", peer)
124            }
125            NetworkEventSendError::HandshakeCompleted { peer } => {
126                write!(f, "Error sending Handshake completed event: {}", peer)
127            }
128            NetworkEventSendError::HandshakeFailed { peer, reason } => {
129                write!(
130                    f,
131                    "Error sending Handshake failed event: {} ({})",
132                    peer, reason
133                )
134            }
135            NetworkEventSendError::InstanceRequestInbound { peer, request } => {
136                write!(
137                    f,
138                    "Error sending Instance request inbound event: {} ({:#?})",
139                    peer, request
140                )
141            }
142            NetworkEventSendError::InstanceResponseInbound { peer, response } => {
143                write!(
144                    f,
145                    "Error sending Instance response inbound event: {} ({:#?})",
146                    peer, response
147                )
148            }
149            NetworkEventSendError::InstanceRequestOutbound { peer, request } => {
150                write!(
151                    f,
152                    "Error sending Instance request outbound event: {} ({:#?})",
153                    peer, request
154                )
155            }
156            NetworkEventSendError::InstanceResponseOutbound { peer, response } => {
157                write!(
158                    f,
159                    "Error sending Instance response outbound event: {} ({:#?})",
160                    peer, response
161                )
162            }
163            NetworkEventSendError::GossipReceived {
164                source,
165                topic,
166                message,
167            } => {
168                write!(
169                    f,
170                    "Error sending Gossip received event on topic: {} from source: {} ({:#?})",
171                    topic, source, message
172                )
173            }
174            NetworkEventSendError::GossipSent { topic, message } => {
175                write!(
176                    f,
177                    "Error sending Gossip sent event on topic: {} ({:#?})",
178                    topic, message
179                )
180            }
181        }
182    }
183}
184
185/// Network message types
186#[derive(Debug)]
187pub enum NetworkCommandMessage<K: KeyType> {
188    InstanceRequest {
189        peer: PeerId,
190        request: InstanceMessageRequest<K>,
191    },
192    GossipMessage {
193        source: PeerId,
194        topic: String,
195        message: Vec<u8>,
196    },
197    SubscribeToTopic(String),
198    UnsubscribeFromTopic(String),
199}
200
201/// Configuration for the network service
202#[derive(Debug, Clone)]
203pub struct NetworkConfig<K: KeyType> {
204    /// Network name/namespace
205    pub network_name: String,
206    /// Instance id for blueprint protocol
207    pub instance_id: String,
208    /// Instance secret key for blueprint protocol
209    pub instance_key_pair: K::Secret,
210    /// Local keypair for authentication
211    pub local_key: Keypair,
212    /// Address to listen on
213    pub listen_addr: Multiaddr,
214    /// Target number of peers to maintain
215    pub target_peer_count: u32,
216    /// Bootstrap peers to connect to
217    pub bootstrap_peers: Vec<Multiaddr>,
218    /// Whether to enable mDNS discovery
219    pub enable_mdns: bool,
220    /// Whether to enable Kademlia DHT
221    pub enable_kademlia: bool,
222    /// Whether to use evm addresses for verification of handshakes and msgs
223    pub using_evm_address_for_handshake_verification: bool,
224}
225
226pub struct NetworkService<K: KeyType> {
227    /// The libp2p swarm
228    swarm: Swarm<BlueprintBehaviour<K>>,
229    /// The local signing key
230    local_signing_key: K::Secret,
231    /// Peer manager for tracking peer states
232    pub(crate) peer_manager: Arc<PeerManager<K>>,
233    /// Channel for sending messages to the network service
234    network_sender: Sender<NetworkCommandMessage<K>>,
235    /// Channel for receiving messages from the network service
236    network_receiver: Receiver<NetworkCommandMessage<K>>,
237    /// Channel for receiving messages from the network service
238    protocol_message_receiver: Receiver<ProtocolMessage>,
239    /// Channel for sending events to the network service
240    event_sender: Sender<NetworkEvent<K>>,
241    /// Channel for receiving events from the network service
242    #[expect(dead_code)] // For future use
243    event_receiver: Receiver<NetworkEvent<K>>,
244    /// Bootstrap peers
245    bootstrap_peers: HashSet<Multiaddr>,
246    /// Channel for receiving allowed keys updates
247    allowed_keys_rx: Receiver<AllowedKeys<K>>,
248}
249
250impl<K: KeyType> NetworkService<K> {
251    /// Create a new network service
252    ///
253    /// # Errors
254    ///
255    /// * See [`BlueprintBehaviour::new`]
256    /// * Bad `listen_addr` in the provided [`NetworkConfig`]
257    #[allow(clippy::missing_panics_doc)] // Unwrapping an Infallible
258    pub fn new(
259        config: NetworkConfig<K>,
260        allowed_keys: AllowedKeys<K>,
261        allowed_keys_rx: Receiver<AllowedKeys<K>>,
262    ) -> Result<Self, Error> {
263        let NetworkConfig::<K> {
264            network_name,
265            instance_id,
266            instance_key_pair,
267            local_key,
268            listen_addr,
269            target_peer_count,
270            bootstrap_peers,
271            enable_mdns: _,
272            enable_kademlia: _,
273            using_evm_address_for_handshake_verification,
274            ..
275        } = config;
276
277        let peer_manager = Arc::new(PeerManager::new(allowed_keys));
278        let blueprint_protocol_name = format!("/{network_name}/{instance_id}");
279
280        let (network_sender, network_receiver) = crossbeam_channel::unbounded();
281        let (protocol_message_sender, protocol_message_receiver) = crossbeam_channel::unbounded();
282        let (event_sender, event_receiver) = crossbeam_channel::unbounded();
283
284        // Create the swarm
285        let blueprint_behaviour_config = BlueprintBehaviourConfig {
286            network_name,
287            blueprint_protocol_name: blueprint_protocol_name.clone(),
288            local_key: local_key.clone(),
289            instance_key_pair: instance_key_pair.clone(),
290            target_peer_count,
291            peer_manager: peer_manager.clone(),
292            protocol_message_sender,
293            using_evm_address_for_handshake_verification,
294        };
295        let behaviour = BlueprintBehaviour::new(blueprint_behaviour_config)?;
296
297        let mut swarm = SwarmBuilder::with_existing_identity(local_key)
298            .with_tokio()
299            .with_tcp(
300                libp2p::tcp::Config::default().nodelay(true),
301                libp2p::noise::Config::new,
302                libp2p::yamux::Config::default,
303            )?
304            .with_quic_config(|mut config| {
305                config.handshake_timeout = Duration::from_secs(30);
306                config
307            })
308            .with_dns()?
309            .with_behaviour(|_| behaviour)
310            .unwrap()
311            .build();
312
313        swarm
314            .behaviour_mut()
315            .blueprint_protocol
316            .subscribe(&blueprint_protocol_name)?;
317
318        // Start listening
319        swarm.listen_on(listen_addr)?;
320        let bootstrap_peers = bootstrap_peers.into_iter().collect();
321
322        Ok(Self {
323            swarm,
324            local_signing_key: instance_key_pair,
325            peer_manager,
326            network_sender,
327            network_receiver,
328            protocol_message_receiver,
329            event_sender,
330            event_receiver,
331            bootstrap_peers,
332            allowed_keys_rx,
333        })
334    }
335
336    /// Get a sender to send messages to the network service
337    pub fn network_sender(&self) -> Sender<NetworkCommandMessage<K>> {
338        self.network_sender.clone()
339    }
340
341    pub fn start(self) -> NetworkServiceHandle<K> {
342        let local_peer_id = *self.swarm.local_peer_id();
343        let network_sender = self.network_sender.clone();
344        let protocol_message_receiver = self.protocol_message_receiver.clone();
345
346        // Create handle with new interface
347        let handle = NetworkServiceHandle::new(
348            local_peer_id,
349            self.swarm
350                .behaviour()
351                .blueprint_protocol
352                .blueprint_protocol_name
353                .clone(),
354            self.local_signing_key.clone(),
355            self.peer_manager.clone(),
356            network_sender,
357            protocol_message_receiver,
358        );
359
360        // Add our own peer ID to the peer manager with all listening addresses
361        let mut info = PeerInfo::default();
362        for addr in self.swarm.listeners() {
363            info.addresses.insert(addr.clone());
364        }
365        self.peer_manager.update_peer(local_peer_id, info);
366
367        // Start allowed keys updater
368        let peer_manager = self.peer_manager.clone();
369        let allowed_keys_rx = self.allowed_keys_rx.clone();
370        tokio::spawn(async move {
371            peer_manager.run_allowed_keys_updater(&allowed_keys_rx);
372        });
373
374        // Spawn background task
375        tokio::spawn(async move {
376            Box::pin(self.run()).await;
377        });
378
379        handle
380    }
381
382    /// Run the network service
383    async fn run(mut self) {
384        info!("Starting network service");
385
386        // Bootstrap with Kademlia
387        if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
388            warn!("Failed to bootstrap with Kademlia: {}", e);
389        }
390
391        // Connect to bootstrap peers
392        for addr in &self.bootstrap_peers {
393            debug!("Dialing bootstrap peer at {}", addr);
394            if let Err(e) = self.swarm.dial(addr.clone()) {
395                warn!("Failed to dial bootstrap peer: {}", e);
396            }
397        }
398
399        // Track when we last attempted to retry handshakes
400        let mut last_handshake_retry = tokio::time::Instant::now();
401        // Retry unverified handshakes every 5 seconds
402        const HANDSHAKE_RETRY_INTERVAL: Duration = Duration::from_secs(3);
403
404        loop {
405            // Check if we should retry handshakes for unverified peers
406            let now = tokio::time::Instant::now();
407            if now.duration_since(last_handshake_retry) >= HANDSHAKE_RETRY_INTERVAL {
408                self.retry_unverified_handshakes();
409                last_handshake_retry = now;
410            }
411
412            tokio::select! {
413                swarm_event = self.swarm.select_next_some() => {
414                    match swarm_event {
415                        SwarmEvent::NewListenAddr { address, .. } => {
416                            info!("New listen address: {}", address);
417                            let local_peer_id = *self.swarm.local_peer_id();
418                            let mut info = self.peer_manager.get_peer_info(&local_peer_id)
419                                .unwrap_or_default();
420                            info.addresses.insert(address.clone());
421                            self.peer_manager.update_peer(local_peer_id, info);
422                        },
423                        SwarmEvent::Behaviour(event) => {
424                            if let Err(e) = handle_behaviour_event(
425                                &mut self.swarm,
426                                &self.peer_manager,
427                                event,
428                                &self.event_sender,
429                            )
430                            {
431                                warn!("Failed to handle swarm event: {}", e);
432                            }
433                        },
434                        _ => {}
435                    }
436                }
437                Ok(msg) = async { self.network_receiver.try_recv() } => {
438                    if let Err(e) = handle_network_message(
439                        &mut self.swarm,
440                        msg,
441                        &self.peer_manager,
442                        &self.event_sender,
443                    )
444                    {
445                        warn!("Failed to handle network message: {}", e);
446                    }
447                }
448                // Add a short timeout to ensure we don't miss the handshake retry check
449                () = tokio::time::sleep(Duration::from_millis(100)) => {}
450                else => break,
451            }
452        }
453
454        info!("Network service stopped");
455    }
456
457    /// Attempts to initiate handshakes with connected but unverified peers
458    fn retry_unverified_handshakes(&mut self) {
459        let connected_peers = self.swarm.behaviour().discovery.get_peers().clone();
460        for peer_id in connected_peers {
461            // Skip peers that are already verified or banned
462            if self.peer_manager.is_peer_verified(&peer_id) || self.peer_manager.is_banned(&peer_id)
463            {
464                continue;
465            }
466
467            debug!("Retrying handshake with unverified peer: {}", peer_id);
468            if let Err(e) = self
469                .swarm
470                .behaviour_mut()
471                .blueprint_protocol
472                .send_handshake(&peer_id)
473            {
474                debug!("Failed to retry handshake with peer {}: {:?}", peer_id, e);
475            }
476        }
477    }
478
479    /// Get the current listening address
480    pub fn get_listen_addr(&self) -> Option<Multiaddr> {
481        self.swarm.listeners().next().cloned()
482    }
483}
484
485/// Handle a behaviour event
486fn handle_behaviour_event<K: KeyType>(
487    swarm: &mut Swarm<BlueprintBehaviour<K>>,
488    peer_manager: &Arc<PeerManager<K>>,
489    event: BlueprintBehaviourEvent<K>,
490    event_sender: &Sender<NetworkEvent<K>>,
491) -> Result<(), Error> {
492    match event {
493        BlueprintBehaviourEvent::ConnectionLimits(_) => {}
494        BlueprintBehaviourEvent::Discovery(discovery_event) => {
495            handle_discovery_event(swarm, peer_manager, discovery_event, event_sender)?;
496        }
497        BlueprintBehaviourEvent::BlueprintProtocol(blueprint_event) => {
498            handle_blueprint_protocol_event(swarm, peer_manager, blueprint_event, event_sender)?;
499        }
500        BlueprintBehaviourEvent::Ping(ping_event) => {
501            handle_ping_event(swarm, peer_manager, ping_event, event_sender)?;
502        }
503    }
504
505    Ok(())
506}
507
508/// Handle a discovery event
509fn handle_discovery_event<K: KeyType>(
510    swarm: &mut Swarm<BlueprintBehaviour<K>>,
511    peer_manager: &Arc<PeerManager<K>>,
512    event: DiscoveryEvent,
513    event_sender: &Sender<NetworkEvent<K>>,
514) -> Result<(), Error> {
515    match event {
516        DiscoveryEvent::PeerConnected(peer_id) => {
517            info!("Peer connected, {peer_id}");
518            // Update peer info when connected
519            if let Some(info) = swarm.behaviour().discovery.peer_info.get(&peer_id) {
520                peer_manager.update_peer(peer_id, info.clone());
521            }
522            event_sender
523                .send(NetworkEvent::PeerConnected(peer_id))
524                .map_err(|_| {
525                    SendError(NetworkEventSendError::<K>::PeerConnected(peer_id).to_string())
526                })?;
527        }
528        DiscoveryEvent::PeerDisconnected(peer_id) => {
529            info!("Peer disconnected, {peer_id}");
530            peer_manager.remove_peer(&peer_id, "disconnected");
531            event_sender
532                .send(NetworkEvent::PeerDisconnected(peer_id))
533                .map_err(|_| {
534                    SendError(NetworkEventSendError::<K>::PeerDisconnected(peer_id).to_string())
535                })?;
536        }
537        DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
538            DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
539                peer_id,
540                info,
541                ..
542            }) => {
543                info!(%peer_id, "Received identify event");
544                let protocols: HashSet<String> = info
545                    .protocols
546                    .iter()
547                    .map(std::string::ToString::to_string)
548                    .collect();
549
550                trace!(%peer_id, ?protocols, "Supported protocols");
551
552                let blueprint_protocol_name =
553                    &swarm.behaviour().blueprint_protocol.blueprint_protocol_name;
554                if !protocols.contains(blueprint_protocol_name) {
555                    warn!(%peer_id, %blueprint_protocol_name, "Peer does not support required protocol");
556                    peer_manager.ban_peer_with_default_duration(*peer_id, "protocol unsupported");
557                    return Ok(());
558                }
559
560                // Get existing peer info or create new one
561                let mut peer_info = peer_manager.get_peer_info(peer_id).unwrap_or_default();
562
563                // Update identify info
564                peer_info.identify_info = Some(info.clone());
565
566                trace!(%peer_id, listen_addrs=?info.listen_addrs, "Adding identify addresses");
567                // Add all addresses from identify info
568                for addr in &info.listen_addrs {
569                    peer_info.addresses.insert(addr.clone());
570                }
571
572                trace!(%peer_id, "Updating peer info with identify information");
573                peer_manager.update_peer(*peer_id, peer_info);
574                debug!(%peer_id, "Successfully processed identify information");
575            }
576            DerivedDiscoveryBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed {
577                result: kad::QueryResult::GetClosestPeers(Ok(ok)),
578                ..
579            }) => {
580                // Process newly discovered peers
581                for peer_info in &ok.peers {
582                    if !peer_manager.get_peers().contains_key(&peer_info.peer_id) {
583                        info!(%peer_info.peer_id, "Newly discovered peer from Kademlia");
584                        let info = PeerInfo::default();
585                        peer_manager.update_peer(peer_info.peer_id, info);
586                        let addrs: Vec<_> = peer_info.addrs.clone();
587                        for addr in addrs {
588                            debug!(%peer_info.peer_id, %addr, "Dialing peer from Kademlia");
589                            if let Err(e) = swarm.dial(DialOpts::from(addr)) {
590                                warn!("Failed to dial address: {}", e);
591                            }
592                        }
593                    }
594                }
595            }
596            DerivedDiscoveryBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
597                // Add newly discovered peers from mDNS
598                for (peer_id, addr) in list {
599                    if !peer_manager.get_peers().contains_key(peer_id) {
600                        info!(%peer_id, %addr, "Newly discovered peer from Mdns");
601                        let mut info = PeerInfo::default();
602                        info.addresses.insert(addr.clone());
603                        peer_manager.update_peer(*peer_id, info);
604                        debug!(%peer_id, %addr, "Dialing peer from Mdns");
605                        if let Err(e) = swarm.dial(DialOpts::from(addr.clone())) {
606                            warn!("Failed to dial address: {}", e);
607                        }
608                    }
609                }
610            }
611            _ => {}
612        },
613    }
614
615    Ok(())
616}
617
618/// Handle a blueprint event
619fn handle_blueprint_protocol_event<K: KeyType>(
620    _swarm: &mut Swarm<BlueprintBehaviour<K>>,
621    _peer_manager: &Arc<PeerManager<K>>,
622    event: BlueprintProtocolEvent<K>,
623    event_sender: &Sender<NetworkEvent<K>>,
624) -> Result<(), Error> {
625    match event {
626        BlueprintProtocolEvent::Request {
627            peer,
628            request,
629            channel: _,
630        } => event_sender
631            .send(NetworkEvent::InstanceRequestInbound {
632                peer,
633                request: request.clone(),
634            })
635            .map_err(|_| {
636                SendError(
637                    NetworkEventSendError::<K>::InstanceRequestInbound { peer, request }
638                        .to_string(),
639                )
640            })?,
641        BlueprintProtocolEvent::Response {
642            peer,
643            response,
644            request_id: _,
645        } => event_sender
646            .send(NetworkEvent::InstanceResponseInbound {
647                peer,
648                response: response.clone(),
649            })
650            .map_err(|_| {
651                SendError(
652                    NetworkEventSendError::<K>::InstanceResponseInbound { peer, response }
653                        .to_string(),
654                )
655            })?,
656        BlueprintProtocolEvent::GossipMessage {
657            source,
658            topic,
659            message,
660        } => event_sender
661            .send(NetworkEvent::GossipReceived {
662                source,
663                topic: topic.to_string(),
664                message: message.clone(),
665            })
666            .map_err(|_| {
667                SendError(
668                    NetworkEventSendError::<K>::GossipReceived {
669                        source,
670                        topic: topic.to_string(),
671                        message,
672                    }
673                    .to_string(),
674                )
675            })?,
676    }
677
678    Ok(())
679}
680
681/// Handle a ping event
682#[expect(clippy::unnecessary_wraps)]
683fn handle_ping_event<K: KeyType>(
684    _swarm: &mut Swarm<BlueprintBehaviour<K>>,
685    _peer_manager: &Arc<PeerManager<K>>,
686    event: ping::Event,
687    _event_sender: &Sender<NetworkEvent<K>>,
688) -> Result<(), Error> {
689    match event.result {
690        Ok(rtt) => {
691            trace!(
692                "PingSuccess::Ping rtt to {} is {} ms",
693                event.peer,
694                rtt.as_millis()
695            );
696        }
697        Err(ping::Failure::Unsupported) => {
698            debug!(peer=%event.peer, "Ping protocol unsupported");
699        }
700        Err(ping::Failure::Timeout) => {
701            debug!("Ping timeout: {}", event.peer);
702        }
703        Err(ping::Failure::Other { error }) => {
704            debug!("Ping failure: {error}");
705        }
706    }
707
708    Ok(())
709}
710
711/// Handle a network message
712fn handle_network_message<K: KeyType>(
713    swarm: &mut Swarm<BlueprintBehaviour<K>>,
714    msg: NetworkCommandMessage<K>,
715    peer_manager: &Arc<PeerManager<K>>,
716    event_sender: &Sender<NetworkEvent<K>>,
717) -> Result<(), Error> {
718    match msg {
719        NetworkCommandMessage::InstanceRequest { peer, request } => {
720            // Only send requests to verified peers
721            if !peer_manager.is_peer_verified(&peer) {
722                warn!(%peer, "Attempted to send request to unverified peer");
723                return Ok(());
724            }
725
726            debug!(%peer, ?request, "Sending instance request");
727            swarm
728                .behaviour_mut()
729                .blueprint_protocol
730                .send_request(&peer, request.clone());
731            event_sender
732                .send(NetworkEvent::InstanceRequestOutbound {
733                    peer,
734                    request: request.clone(),
735                })
736                .map_err(|_| {
737                    SendError(
738                        NetworkEventSendError::<K>::InstanceRequestOutbound { peer, request }
739                            .to_string(),
740                    )
741                })?;
742        }
743        NetworkCommandMessage::GossipMessage {
744            source,
745            topic,
746            message,
747        } => {
748            debug!(%source, %topic, "Publishing gossip message");
749            if let Err(e) = swarm
750                .behaviour_mut()
751                .blueprint_protocol
752                .publish(&topic, message.clone())
753            {
754                warn!(%source, %topic, "Failed to publish gossip message: {:?}", e);
755                return Ok(());
756            }
757            event_sender
758                .send(NetworkEvent::GossipSent {
759                    topic: topic.to_string(),
760                    message: message.clone(),
761                })
762                .map_err(|_| {
763                    SendError(NetworkEventSendError::<K>::GossipSent { topic, message }.to_string())
764                })?;
765        }
766        NetworkCommandMessage::SubscribeToTopic(topic) => {
767            swarm.behaviour_mut().blueprint_protocol.subscribe(&topic)?;
768        }
769        NetworkCommandMessage::UnsubscribeFromTopic(topic) => {
770            swarm.behaviour_mut().blueprint_protocol.unsubscribe(&topic);
771        }
772    }
773
774    Ok(())
775}