use crate::{
behaviours::{BlueprintBehaviour, BlueprintBehaviourConfig, BlueprintBehaviourEvent},
blueprint_protocol::{BlueprintProtocolEvent, InstanceMessageRequest, InstanceMessageResponse},
discovery::{
PeerInfo, PeerManager,
behaviour::{DerivedDiscoveryBehaviourEvent, DiscoveryEvent},
},
error::Error,
service_handle::NetworkServiceHandle,
types::ProtocolMessage,
};
use alloy_primitives::Address;
use blueprint_core::{debug, info, trace, warn};
use blueprint_crypto::KeyType;
use blueprint_std::{fmt::Display, sync::Arc, time::Duration};
use crossbeam_channel::{self, Receiver, SendError, Sender};
use futures::StreamExt;
use libp2p::{
Multiaddr, PeerId, Swarm, SwarmBuilder, identify,
identity::Keypair,
kad, mdns, ping,
swarm::{SwarmEvent, dial_opts::DialOpts},
};
use std::collections::HashSet;
pub enum AllowedKeys<K: KeyType> {
EvmAddresses(HashSet<Address>),
InstancePublicKeys(HashSet<K::Public>),
}
impl<K: KeyType> Default for AllowedKeys<K> {
fn default() -> Self {
Self::InstancePublicKeys(HashSet::new())
}
}
#[derive(Debug)]
pub enum NetworkEvent<K: KeyType> {
InstanceRequestInbound {
peer: PeerId,
request: InstanceMessageRequest<K>,
},
InstanceResponseInbound {
peer: PeerId,
response: InstanceMessageResponse<K>,
},
InstanceRequestOutbound {
peer: PeerId,
request: InstanceMessageRequest<K>,
},
InstanceResponseOutbound {
peer: PeerId,
response: InstanceMessageResponse<K>,
},
GossipReceived {
source: PeerId,
topic: String,
message: Vec<u8>,
},
GossipSent { topic: String, message: Vec<u8> },
PeerConnected(PeerId),
PeerDisconnected(PeerId),
HandshakeCompleted { peer: PeerId },
HandshakeFailed { peer: PeerId, reason: String },
}
#[derive(Debug)]
pub enum NetworkEventSendError<K: KeyType> {
PeerConnected(PeerId),
PeerDisconnected(PeerId),
HandshakeCompleted {
peer: PeerId,
},
HandshakeFailed {
peer: PeerId,
reason: String,
},
InstanceRequestInbound {
peer: PeerId,
request: InstanceMessageRequest<K>,
},
InstanceResponseInbound {
peer: PeerId,
response: InstanceMessageResponse<K>,
},
InstanceRequestOutbound {
peer: PeerId,
request: InstanceMessageRequest<K>,
},
InstanceResponseOutbound {
peer: PeerId,
response: InstanceMessageResponse<K>,
},
GossipReceived {
source: PeerId,
topic: String,
message: Vec<u8>,
},
GossipSent {
topic: String,
message: Vec<u8>,
},
}
impl<K: KeyType> Display for NetworkEventSendError<K> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
NetworkEventSendError::PeerConnected(peer) => {
write!(f, "Error sending Peer connected event: {}", peer)
}
NetworkEventSendError::PeerDisconnected(peer) => {
write!(f, "Error sending Peer disconnected event: {}", peer)
}
NetworkEventSendError::HandshakeCompleted { peer } => {
write!(f, "Error sending Handshake completed event: {}", peer)
}
NetworkEventSendError::HandshakeFailed { peer, reason } => {
write!(
f,
"Error sending Handshake failed event: {} ({})",
peer, reason
)
}
NetworkEventSendError::InstanceRequestInbound { peer, request } => {
write!(
f,
"Error sending Instance request inbound event: {} ({:#?})",
peer, request
)
}
NetworkEventSendError::InstanceResponseInbound { peer, response } => {
write!(
f,
"Error sending Instance response inbound event: {} ({:#?})",
peer, response
)
}
NetworkEventSendError::InstanceRequestOutbound { peer, request } => {
write!(
f,
"Error sending Instance request outbound event: {} ({:#?})",
peer, request
)
}
NetworkEventSendError::InstanceResponseOutbound { peer, response } => {
write!(
f,
"Error sending Instance response outbound event: {} ({:#?})",
peer, response
)
}
NetworkEventSendError::GossipReceived {
source,
topic,
message,
} => {
write!(
f,
"Error sending Gossip received event on topic: {} from source: {} ({:#?})",
topic, source, message
)
}
NetworkEventSendError::GossipSent { topic, message } => {
write!(
f,
"Error sending Gossip sent event on topic: {} ({:#?})",
topic, message
)
}
}
}
}
#[derive(Debug)]
pub enum NetworkCommandMessage<K: KeyType> {
InstanceRequest {
peer: PeerId,
request: InstanceMessageRequest<K>,
},
GossipMessage {
source: PeerId,
topic: String,
message: Vec<u8>,
},
SubscribeToTopic(String),
UnsubscribeFromTopic(String),
}
#[derive(Debug, Clone)]
pub struct NetworkConfig<K: KeyType> {
pub network_name: String,
pub instance_id: String,
pub instance_key_pair: K::Secret,
pub local_key: Keypair,
pub listen_addr: Multiaddr,
pub target_peer_count: u32,
pub bootstrap_peers: Vec<Multiaddr>,
pub enable_mdns: bool,
pub enable_kademlia: bool,
pub using_evm_address_for_handshake_verification: bool,
}
pub struct NetworkService<K: KeyType> {
swarm: Swarm<BlueprintBehaviour<K>>,
local_signing_key: K::Secret,
pub(crate) peer_manager: Arc<PeerManager<K>>,
network_sender: Sender<NetworkCommandMessage<K>>,
network_receiver: Receiver<NetworkCommandMessage<K>>,
protocol_message_receiver: Receiver<ProtocolMessage>,
event_sender: Sender<NetworkEvent<K>>,
#[expect(dead_code)] event_receiver: Receiver<NetworkEvent<K>>,
bootstrap_peers: HashSet<Multiaddr>,
allowed_keys_rx: Receiver<AllowedKeys<K>>,
}
impl<K: KeyType> NetworkService<K> {
#[allow(clippy::missing_panics_doc)] pub fn new(
config: NetworkConfig<K>,
allowed_keys: AllowedKeys<K>,
allowed_keys_rx: Receiver<AllowedKeys<K>>,
) -> Result<Self, Error> {
let NetworkConfig::<K> {
network_name,
instance_id,
instance_key_pair,
local_key,
listen_addr,
target_peer_count,
bootstrap_peers,
enable_mdns: _,
enable_kademlia: _,
using_evm_address_for_handshake_verification,
..
} = config;
let peer_manager = Arc::new(PeerManager::new(allowed_keys));
let blueprint_protocol_name = format!("/{network_name}/{instance_id}");
let (network_sender, network_receiver) = crossbeam_channel::unbounded();
let (protocol_message_sender, protocol_message_receiver) = crossbeam_channel::unbounded();
let (event_sender, event_receiver) = crossbeam_channel::unbounded();
let blueprint_behaviour_config = BlueprintBehaviourConfig {
network_name,
blueprint_protocol_name: blueprint_protocol_name.clone(),
local_key: local_key.clone(),
instance_key_pair: instance_key_pair.clone(),
target_peer_count,
peer_manager: peer_manager.clone(),
protocol_message_sender,
using_evm_address_for_handshake_verification,
};
let behaviour = BlueprintBehaviour::new(blueprint_behaviour_config)?;
let mut swarm = SwarmBuilder::with_existing_identity(local_key)
.with_tokio()
.with_tcp(
libp2p::tcp::Config::default().nodelay(true),
libp2p::noise::Config::new,
libp2p::yamux::Config::default,
)?
.with_quic_config(|mut config| {
config.handshake_timeout = Duration::from_secs(30);
config
})
.with_dns()?
.with_behaviour(|_| behaviour)
.unwrap()
.build();
swarm
.behaviour_mut()
.blueprint_protocol
.subscribe(&blueprint_protocol_name)?;
swarm.listen_on(listen_addr)?;
let bootstrap_peers = bootstrap_peers.into_iter().collect();
Ok(Self {
swarm,
local_signing_key: instance_key_pair,
peer_manager,
network_sender,
network_receiver,
protocol_message_receiver,
event_sender,
event_receiver,
bootstrap_peers,
allowed_keys_rx,
})
}
pub fn network_sender(&self) -> Sender<NetworkCommandMessage<K>> {
self.network_sender.clone()
}
pub fn start(self) -> NetworkServiceHandle<K> {
let local_peer_id = *self.swarm.local_peer_id();
let network_sender = self.network_sender.clone();
let protocol_message_receiver = self.protocol_message_receiver.clone();
let handle = NetworkServiceHandle::new(
local_peer_id,
self.swarm
.behaviour()
.blueprint_protocol
.blueprint_protocol_name
.clone(),
self.local_signing_key.clone(),
self.peer_manager.clone(),
network_sender,
protocol_message_receiver,
);
let mut info = PeerInfo::default();
for addr in self.swarm.listeners() {
info.addresses.insert(addr.clone());
}
self.peer_manager.update_peer(local_peer_id, info);
let peer_manager = self.peer_manager.clone();
let allowed_keys_rx = self.allowed_keys_rx.clone();
tokio::spawn(async move {
peer_manager.run_allowed_keys_updater(&allowed_keys_rx);
});
tokio::spawn(async move {
Box::pin(self.run()).await;
});
handle
}
async fn run(mut self) {
info!("Starting network service");
if let Err(e) = self.swarm.behaviour_mut().bootstrap() {
warn!("Failed to bootstrap with Kademlia: {}", e);
}
for addr in &self.bootstrap_peers {
debug!("Dialing bootstrap peer at {}", addr);
if let Err(e) = self.swarm.dial(addr.clone()) {
warn!("Failed to dial bootstrap peer: {}", e);
}
}
let mut last_handshake_retry = tokio::time::Instant::now();
const HANDSHAKE_RETRY_INTERVAL: Duration = Duration::from_secs(3);
loop {
let now = tokio::time::Instant::now();
if now.duration_since(last_handshake_retry) >= HANDSHAKE_RETRY_INTERVAL {
self.retry_unverified_handshakes();
last_handshake_retry = now;
}
tokio::select! {
swarm_event = self.swarm.select_next_some() => {
match swarm_event {
SwarmEvent::NewListenAddr { address, .. } => {
info!("New listen address: {}", address);
let local_peer_id = *self.swarm.local_peer_id();
let mut info = self.peer_manager.get_peer_info(&local_peer_id)
.unwrap_or_default();
info.addresses.insert(address.clone());
self.peer_manager.update_peer(local_peer_id, info);
},
SwarmEvent::Behaviour(event) => {
if let Err(e) = handle_behaviour_event(
&mut self.swarm,
&self.peer_manager,
event,
&self.event_sender,
)
{
warn!("Failed to handle swarm event: {}", e);
}
},
_ => {}
}
}
Ok(msg) = async { self.network_receiver.try_recv() } => {
if let Err(e) = handle_network_message(
&mut self.swarm,
msg,
&self.peer_manager,
&self.event_sender,
)
{
warn!("Failed to handle network message: {}", e);
}
}
() = tokio::time::sleep(Duration::from_millis(100)) => {}
else => break,
}
}
info!("Network service stopped");
}
fn retry_unverified_handshakes(&mut self) {
let connected_peers = self.swarm.behaviour().discovery.get_peers().clone();
for peer_id in connected_peers {
if self.peer_manager.is_peer_verified(&peer_id) || self.peer_manager.is_banned(&peer_id)
{
continue;
}
debug!("Retrying handshake with unverified peer: {}", peer_id);
if let Err(e) = self
.swarm
.behaviour_mut()
.blueprint_protocol
.send_handshake(&peer_id)
{
debug!("Failed to retry handshake with peer {}: {:?}", peer_id, e);
}
}
}
pub fn get_listen_addr(&self) -> Option<Multiaddr> {
self.swarm.listeners().next().cloned()
}
}
fn handle_behaviour_event<K: KeyType>(
swarm: &mut Swarm<BlueprintBehaviour<K>>,
peer_manager: &Arc<PeerManager<K>>,
event: BlueprintBehaviourEvent<K>,
event_sender: &Sender<NetworkEvent<K>>,
) -> Result<(), Error> {
match event {
BlueprintBehaviourEvent::ConnectionLimits(_) => {}
BlueprintBehaviourEvent::Discovery(discovery_event) => {
handle_discovery_event(swarm, peer_manager, discovery_event, event_sender)?;
}
BlueprintBehaviourEvent::BlueprintProtocol(blueprint_event) => {
handle_blueprint_protocol_event(swarm, peer_manager, blueprint_event, event_sender)?;
}
BlueprintBehaviourEvent::Ping(ping_event) => {
handle_ping_event(swarm, peer_manager, ping_event, event_sender)?;
}
}
Ok(())
}
fn handle_discovery_event<K: KeyType>(
swarm: &mut Swarm<BlueprintBehaviour<K>>,
peer_manager: &Arc<PeerManager<K>>,
event: DiscoveryEvent,
event_sender: &Sender<NetworkEvent<K>>,
) -> Result<(), Error> {
match event {
DiscoveryEvent::PeerConnected(peer_id) => {
info!("Peer connected, {peer_id}");
if let Some(info) = swarm.behaviour().discovery.peer_info.get(&peer_id) {
peer_manager.update_peer(peer_id, info.clone());
}
event_sender
.send(NetworkEvent::PeerConnected(peer_id))
.map_err(|_| {
SendError(NetworkEventSendError::<K>::PeerConnected(peer_id).to_string())
})?;
}
DiscoveryEvent::PeerDisconnected(peer_id) => {
info!("Peer disconnected, {peer_id}");
peer_manager.remove_peer(&peer_id, "disconnected");
event_sender
.send(NetworkEvent::PeerDisconnected(peer_id))
.map_err(|_| {
SendError(NetworkEventSendError::<K>::PeerDisconnected(peer_id).to_string())
})?;
}
DiscoveryEvent::Discovery(discovery_event) => match &*discovery_event {
DerivedDiscoveryBehaviourEvent::Identify(identify::Event::Received {
peer_id,
info,
..
}) => {
info!(%peer_id, "Received identify event");
let protocols: HashSet<String> = info
.protocols
.iter()
.map(std::string::ToString::to_string)
.collect();
trace!(%peer_id, ?protocols, "Supported protocols");
let blueprint_protocol_name =
&swarm.behaviour().blueprint_protocol.blueprint_protocol_name;
if !protocols.contains(blueprint_protocol_name) {
warn!(%peer_id, %blueprint_protocol_name, "Peer does not support required protocol");
peer_manager.ban_peer_with_default_duration(*peer_id, "protocol unsupported");
return Ok(());
}
let mut peer_info = peer_manager.get_peer_info(peer_id).unwrap_or_default();
peer_info.identify_info = Some(info.clone());
trace!(%peer_id, listen_addrs=?info.listen_addrs, "Adding identify addresses");
for addr in &info.listen_addrs {
peer_info.addresses.insert(addr.clone());
}
trace!(%peer_id, "Updating peer info with identify information");
peer_manager.update_peer(*peer_id, peer_info);
debug!(%peer_id, "Successfully processed identify information");
}
DerivedDiscoveryBehaviourEvent::Kademlia(kad::Event::OutboundQueryProgressed {
result: kad::QueryResult::GetClosestPeers(Ok(ok)),
..
}) => {
for peer_info in &ok.peers {
if !peer_manager.get_peers().contains_key(&peer_info.peer_id) {
info!(%peer_info.peer_id, "Newly discovered peer from Kademlia");
let info = PeerInfo::default();
peer_manager.update_peer(peer_info.peer_id, info);
let addrs: Vec<_> = peer_info.addrs.clone();
for addr in addrs {
debug!(%peer_info.peer_id, %addr, "Dialing peer from Kademlia");
if let Err(e) = swarm.dial(DialOpts::from(addr)) {
warn!("Failed to dial address: {}", e);
}
}
}
}
}
DerivedDiscoveryBehaviourEvent::Mdns(mdns::Event::Discovered(list)) => {
for (peer_id, addr) in list {
if !peer_manager.get_peers().contains_key(peer_id) {
info!(%peer_id, %addr, "Newly discovered peer from Mdns");
let mut info = PeerInfo::default();
info.addresses.insert(addr.clone());
peer_manager.update_peer(*peer_id, info);
debug!(%peer_id, %addr, "Dialing peer from Mdns");
if let Err(e) = swarm.dial(DialOpts::from(addr.clone())) {
warn!("Failed to dial address: {}", e);
}
}
}
}
_ => {}
},
}
Ok(())
}
fn handle_blueprint_protocol_event<K: KeyType>(
_swarm: &mut Swarm<BlueprintBehaviour<K>>,
_peer_manager: &Arc<PeerManager<K>>,
event: BlueprintProtocolEvent<K>,
event_sender: &Sender<NetworkEvent<K>>,
) -> Result<(), Error> {
match event {
BlueprintProtocolEvent::Request {
peer,
request,
channel: _,
} => event_sender
.send(NetworkEvent::InstanceRequestInbound {
peer,
request: request.clone(),
})
.map_err(|_| {
SendError(
NetworkEventSendError::<K>::InstanceRequestInbound { peer, request }
.to_string(),
)
})?,
BlueprintProtocolEvent::Response {
peer,
response,
request_id: _,
} => event_sender
.send(NetworkEvent::InstanceResponseInbound {
peer,
response: response.clone(),
})
.map_err(|_| {
SendError(
NetworkEventSendError::<K>::InstanceResponseInbound { peer, response }
.to_string(),
)
})?,
BlueprintProtocolEvent::GossipMessage {
source,
topic,
message,
} => event_sender
.send(NetworkEvent::GossipReceived {
source,
topic: topic.to_string(),
message: message.clone(),
})
.map_err(|_| {
SendError(
NetworkEventSendError::<K>::GossipReceived {
source,
topic: topic.to_string(),
message,
}
.to_string(),
)
})?,
}
Ok(())
}
#[expect(clippy::unnecessary_wraps)]
fn handle_ping_event<K: KeyType>(
_swarm: &mut Swarm<BlueprintBehaviour<K>>,
_peer_manager: &Arc<PeerManager<K>>,
event: ping::Event,
_event_sender: &Sender<NetworkEvent<K>>,
) -> Result<(), Error> {
match event.result {
Ok(rtt) => {
trace!(
"PingSuccess::Ping rtt to {} is {} ms",
event.peer,
rtt.as_millis()
);
}
Err(ping::Failure::Unsupported) => {
debug!(peer=%event.peer, "Ping protocol unsupported");
}
Err(ping::Failure::Timeout) => {
debug!("Ping timeout: {}", event.peer);
}
Err(ping::Failure::Other { error }) => {
debug!("Ping failure: {error}");
}
}
Ok(())
}
fn handle_network_message<K: KeyType>(
swarm: &mut Swarm<BlueprintBehaviour<K>>,
msg: NetworkCommandMessage<K>,
peer_manager: &Arc<PeerManager<K>>,
event_sender: &Sender<NetworkEvent<K>>,
) -> Result<(), Error> {
match msg {
NetworkCommandMessage::InstanceRequest { peer, request } => {
if !peer_manager.is_peer_verified(&peer) {
warn!(%peer, "Attempted to send request to unverified peer");
return Ok(());
}
debug!(%peer, ?request, "Sending instance request");
swarm
.behaviour_mut()
.blueprint_protocol
.send_request(&peer, request.clone());
event_sender
.send(NetworkEvent::InstanceRequestOutbound {
peer,
request: request.clone(),
})
.map_err(|_| {
SendError(
NetworkEventSendError::<K>::InstanceRequestOutbound { peer, request }
.to_string(),
)
})?;
}
NetworkCommandMessage::GossipMessage {
source,
topic,
message,
} => {
debug!(%source, %topic, "Publishing gossip message");
if let Err(e) = swarm
.behaviour_mut()
.blueprint_protocol
.publish(&topic, message.clone())
{
warn!(%source, %topic, "Failed to publish gossip message: {:?}", e);
return Ok(());
}
event_sender
.send(NetworkEvent::GossipSent {
topic: topic.to_string(),
message: message.clone(),
})
.map_err(|_| {
SendError(NetworkEventSendError::<K>::GossipSent { topic, message }.to_string())
})?;
}
NetworkCommandMessage::SubscribeToTopic(topic) => {
swarm.behaviour_mut().blueprint_protocol.subscribe(&topic)?;
}
NetworkCommandMessage::UnsubscribeFromTopic(topic) => {
swarm.behaviour_mut().blueprint_protocol.unsubscribe(&topic);
}
}
Ok(())
}