use crate::{
network::{error::PubSubError, pubsub},
settings, Receipt, RECEIPT_TAG, WORKFLOW_TAG,
};
use anyhow::{Context, Result};
use const_format::formatcp;
use enum_assoc::Assoc;
use faststr::FastStr;
use futures::future::Either;
use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::{self, OptionalTransport},
upgrade,
},
dns,
gossipsub::{self, MessageId, TopicHash},
identify,
identity::Keypair,
kad::{
self,
store::{MemoryStore, MemoryStoreConfig},
},
mdns,
multiaddr::Protocol,
noise, quic, rendezvous,
request_response::{self, ProtocolSupport},
swarm::{self, behaviour::toggle::Toggle, NetworkBehaviour, Swarm},
yamux, PeerId, StreamProtocol, Transport,
};
use serde::{Deserialize, Serialize};
use std::fmt;
use tracing::{info, warn};
pub(crate) const HOMESTAR_PROTOCOL_VER: &str = formatcp!("homestar/{VERSION}");
const VERSION: &str = env!("CARGO_PKG_VERSION");
pub(crate) async fn new(settings: &settings::Network) -> Result<Swarm<ComposedBehaviour>> {
let keypair = settings
.keypair_config
.keypair()
.with_context(|| "failed to generate/import keypair for libp2p".to_string())?;
let peer_id = keypair.public().to_peer_id();
info!(
subject = "swarm.init",
category = "libp2p.swarm",
peer_id = peer_id.to_string(),
"local peer ID generated"
);
let transport = build_transport(settings, keypair.clone())?;
let mut swarm = Swarm::new(
transport,
ComposedBehaviour {
gossipsub: Toggle::from(if settings.libp2p.pubsub.enable {
Some(pubsub::new(keypair.clone(), settings.libp2p().pubsub())?)
} else {
None
}),
kademlia: kad::Behaviour::with_config(
peer_id,
MemoryStore::with_config(
peer_id,
MemoryStoreConfig {
max_value_bytes: 10 * 1024 * 1024,
..Default::default()
},
),
{
let mut cfg = kad::Config::default();
cfg.set_max_packet_size(10 * 1024 * 1024);
cfg.set_kbucket_inserts(kad::BucketInserts::Manual);
cfg
},
),
request_response: request_response::cbor::Behaviour::new(
[(
StreamProtocol::new("/homestar-exchange/1.0.0"),
ProtocolSupport::Full,
)],
request_response::Config::default(),
),
mdns: Toggle::from(if settings.libp2p.mdns.enable {
Some(mdns::Behaviour::new(
mdns::Config {
ttl: settings.libp2p.mdns.ttl,
query_interval: settings.libp2p.mdns.query_interval,
enable_ipv6: settings.libp2p.mdns.enable_ipv6,
},
peer_id,
)?)
} else {
None
}),
rendezvous_client: Toggle::from(if settings.libp2p.rendezvous.enable_client {
Some(rendezvous::client::Behaviour::new(keypair.clone()))
} else {
None
}),
rendezvous_server: Toggle::from(if settings.libp2p.rendezvous.enable_server {
Some(rendezvous::server::Behaviour::new(
rendezvous::server::Config::with_min_ttl(
rendezvous::server::Config::default(),
1,
),
))
} else {
None
}),
identify: identify::Behaviour::new(
identify::Config::new(HOMESTAR_PROTOCOL_VER.to_string(), keypair.public())
.with_agent_version(format!("homestar-runtime/{}", env!("CARGO_PKG_VERSION"))),
),
},
peer_id,
swarm::Config::with_tokio_executor()
.with_idle_connection_timeout(settings.libp2p.idle_connection_timeout),
);
init(&mut swarm, settings)?;
Ok(swarm)
}
pub(crate) fn init(
swarm: &mut Swarm<ComposedBehaviour>,
settings: &settings::Network,
) -> Result<()> {
swarm.listen_on(settings.libp2p.listen_address.to_string().parse()?)?;
swarm
.behaviour_mut()
.kademlia
.set_mode(Some(kad::Mode::Server));
if !settings.libp2p.announce_addresses.is_empty() {
for addr in settings.libp2p.announce_addresses.iter() {
swarm.add_external_address(addr.clone());
}
} else {
info!(
subject = "swarm.init",
category = "libp2p.swarm",
"no addresses to announce to peers defined in settings: node may be unreachable to external peers"
)
}
for (index, addr) in settings.libp2p.node_addresses.iter().enumerate() {
if index < settings.libp2p.max_connected_peers as usize {
let _ = swarm
.dial(addr.clone())
.map_err(|e| {
warn!(subject = "swarm.init.err",
category = "libp2p.swarm",
err=?e, "failed to dial configured node")
});
if let Some(Protocol::P2p(peer_id)) =
addr.iter().find(|proto| matches!(proto, Protocol::P2p(_)))
{
info!(subject = "swarm.init",
category = "libp2p.swarm",
addr=?addr,
"added configured node to kademlia routing table");
swarm
.behaviour_mut()
.kademlia
.add_address(&peer_id, addr.clone());
} else {
warn!(subject = "swarm.init.err",
category = "libp2p.swarm",
addr=?addr,
err="configured node address did not include a peer ID",
"node not added to kademlia routing table")
}
} else {
warn!(subject = "swarm.init.err",
category = "libp2p.swarm",
addr=?addr,
"address not dialed because node addresses count exceeds max connected peers configuration")
}
}
if settings.libp2p.pubsub.enable {
swarm
.behaviour_mut()
.gossip_subscribe(pubsub::RECEIPTS_TOPIC)?;
}
Ok(())
}
#[derive(Debug, Clone)]
pub(crate) struct PeerDiscoveryInfo {
pub(crate) rendezvous_point: PeerId,
}
impl PeerDiscoveryInfo {
pub(crate) fn new(rendezvous_point: PeerId) -> Self {
Self { rendezvous_point }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct RequestResponseKey {
pub(crate) cid: FastStr,
pub(crate) capsule_tag: CapsuleTag,
}
impl RequestResponseKey {
pub(crate) fn new(cid: FastStr, capsule_tag: CapsuleTag) -> Self {
Self { cid, capsule_tag }
}
}
#[derive(Debug, Clone, Assoc, Serialize, Deserialize)]
#[func(pub(crate) fn tag(&self) -> &'static str)]
#[func(pub(crate) fn capsule_type(s: &str) -> Option<Self>)]
pub(crate) enum CapsuleTag {
#[assoc(tag = RECEIPT_TAG)]
#[assoc(capsule_type = RECEIPT_TAG)]
Receipt,
#[assoc(tag = WORKFLOW_TAG)]
#[assoc(capsule_type = WORKFLOW_TAG)]
Workflow,
}
impl fmt::Display for CapsuleTag {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.tag())
}
}
#[derive(Debug)]
pub(crate) enum ComposedEvent {
Gossipsub(Box<gossipsub::Event>),
Kademlia(kad::Event),
RequestResponse(request_response::Event<RequestResponseKey, Vec<u8>>),
Mdns(mdns::Event),
RendezvousClient(rendezvous::client::Event),
RendezvousServer(rendezvous::server::Event),
Identify(identify::Event),
}
#[derive(Debug)]
pub(crate) enum TopicMessage {
CapturedReceipt(pubsub::Message<Receipt>),
}
#[allow(missing_debug_implementations)]
#[derive(NetworkBehaviour)]
#[behaviour(to_swarm = "ComposedEvent")]
pub(crate) struct ComposedBehaviour {
pub(crate) gossipsub: Toggle<gossipsub::Behaviour>,
pub(crate) kademlia: kad::Behaviour<MemoryStore>,
pub(crate) request_response: request_response::cbor::Behaviour<RequestResponseKey, Vec<u8>>,
pub(crate) mdns: Toggle<mdns::tokio::Behaviour>,
pub(crate) rendezvous_client: Toggle<rendezvous::client::Behaviour>,
pub(crate) rendezvous_server: Toggle<rendezvous::server::Behaviour>,
pub(crate) identify: identify::Behaviour,
}
impl ComposedBehaviour {
pub(crate) fn gossip_subscribe(&mut self, topic: &str) -> Result<bool, PubSubError> {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let topic = gossipsub::IdentTopic::new(topic);
let subscribed = gossipsub.subscribe(&topic)?;
Ok(subscribed)
} else {
Err(PubSubError::NotEnabled)
}
}
pub(crate) fn gossip_publish(
&mut self,
topic: &str,
msg: TopicMessage,
) -> Result<MessageId, PubSubError> {
if let Some(gossipsub) = self.gossipsub.as_mut() {
let id_topic = gossipsub::IdentTopic::new(topic);
let TopicMessage::CapturedReceipt(message) = msg;
let msg_bytes: Vec<u8> = message.try_into()?;
if gossipsub
.mesh_peers(&TopicHash::from_raw(topic))
.peekable()
.peek()
.is_some()
{
let msg_id = gossipsub.publish(id_topic, msg_bytes)?;
Ok(msg_id)
} else {
Err(PubSubError::InsufficientPeers(topic.to_owned()))
}
} else {
Err(PubSubError::NotEnabled)
}
}
}
impl From<gossipsub::Event> for ComposedEvent {
fn from(event: gossipsub::Event) -> Self {
ComposedEvent::Gossipsub(Box::new(event))
}
}
impl From<kad::Event> for ComposedEvent {
fn from(event: kad::Event) -> Self {
ComposedEvent::Kademlia(event)
}
}
impl From<request_response::Event<RequestResponseKey, Vec<u8>>> for ComposedEvent {
fn from(event: request_response::Event<RequestResponseKey, Vec<u8>>) -> Self {
ComposedEvent::RequestResponse(event)
}
}
impl From<mdns::Event> for ComposedEvent {
fn from(event: mdns::Event) -> Self {
ComposedEvent::Mdns(event)
}
}
impl From<rendezvous::client::Event> for ComposedEvent {
fn from(event: rendezvous::client::Event) -> Self {
ComposedEvent::RendezvousClient(event)
}
}
impl From<rendezvous::server::Event> for ComposedEvent {
fn from(event: rendezvous::server::Event) -> Self {
ComposedEvent::RendezvousServer(event)
}
}
impl From<identify::Event> for ComposedEvent {
fn from(event: identify::Event) -> Self {
ComposedEvent::Identify(event)
}
}
fn build_transport(
settings: &settings::Network,
keypair: Keypair,
) -> Result<transport::Boxed<(PeerId, StreamMuxerBox)>> {
let build_tcp = || libp2p::tcp::tokio::Transport::new(libp2p::tcp::Config::new().nodelay(true));
let build_quic = if settings.libp2p.quic.enable {
OptionalTransport::some(quic::tokio::Transport::new(quic::Config::new(&keypair)))
} else {
OptionalTransport::none()
};
let dns_transport = || {
if let Ok((conf, opts)) = hickory_resolver::system_conf::read_system_conf() {
info!(
subject = "swarm.init",
category = "libp2p.swarm",
"using system DNS configuration from /etc/resolv.conf"
);
dns::tokio::Transport::custom(build_tcp(), conf, opts)
} else {
info!(
subject = "swarm.init",
category = "libp2p.swarm",
"using cloudflare DNS configuration as a fallback"
);
dns::tokio::Transport::custom(
build_tcp(),
dns::ResolverConfig::cloudflare(),
dns::ResolverOpts::default(),
)
}
};
let transport = libp2p::websocket::WsConfig::new(dns_transport())
.or_transport(dns_transport())
.or_transport(libp2p::websocket::WsConfig::new(build_tcp()))
.upgrade(upgrade::Version::V1Lazy)
.authenticate(noise::Config::new(&keypair)?)
.multiplex(yamux::Config::default())
.timeout(settings.libp2p.transport_connection_timeout)
.or_transport(build_quic)
.map(|either_output, _| match either_output {
Either::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
Either::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
})
.boxed();
Ok(transport)
}