mod executor;
mod transport;
use crate::behaviour;
#[cfg(feature = "request-response")]
use crate::behaviour::request_response::RequestResponseConfig;
#[cfg(feature = "dns")]
use crate::builder::transport::DnsResolver;
use crate::builder::transport::TransportConfig;
use crate::handle::Connexa;
use crate::task::ConnexaTask;
use executor::ConnexaExecutor;
use libp2p::Swarm;
#[cfg(feature = "autonat")]
use libp2p::autonat::v1::Config as AutonatV1Config;
#[cfg(feature = "autonat")]
use libp2p::autonat::v2::client::Config as AutonatV2ClientConfig;
#[cfg(feature = "floodsub")]
use libp2p::floodsub::FloodsubConfig;
#[cfg(feature = "identify")]
use libp2p::identify::Config as IdentifyConfig;
use libp2p::identity::Keypair;
#[cfg(feature = "kad")]
use libp2p::kad::Config as KadConfig;
#[cfg(feature = "ping")]
use libp2p::ping::Config as PingConfig;
#[cfg(feature = "pnet")]
use libp2p::pnet::PreSharedKey;
#[cfg(feature = "relay")]
use libp2p::relay::Config as RelayServerConfig;
use libp2p::swarm::{NetworkBehaviour, SwarmEvent};
use libp2p_connection_limits::ConnectionLimits;
use std::fmt::Debug;
#[cfg(feature = "quic")]
use std::time::Duration;
use tracing::Span;
#[derive(Debug, Copy, Clone)]
pub enum FileDescLimit {
Max,
Custom(u64),
}
pub struct ConnexaBuilder<X, C, T>
where
C: NetworkBehaviour,
C: Send,
C::ToSwarm: Debug,
T: Send + Sync + 'static,
X: Default + Send + Sync + 'static,
{
keypair: Keypair,
context: X,
custom_behaviour: Option<Box<dyn Fn(Keypair) -> C>>,
file_descriptor_limits: Option<FileDescLimit>,
custom_task_callback:
Box<dyn Fn(&mut Swarm<behaviour::Behaviour<C>>, &mut X, T) + 'static + Send>,
custom_event_callback:
Box<dyn Fn(&mut Swarm<behaviour::Behaviour<C>>, &mut X, C::ToSwarm) + 'static + Send>,
swarm_event_callback: Box<dyn Fn(&SwarmEvent<behaviour::BehaviourEvent<C>>) + 'static + Send>,
config: Config,
swarm_config: Box<dyn Fn(libp2p::swarm::Config) -> libp2p::swarm::Config>,
transport_config: TransportConfig,
protocols: Protocols,
}
pub(crate) struct Config {
#[cfg(feature = "kad")]
pub kademlia_config: (String, Box<dyn Fn(KadConfig) -> KadConfig>),
#[cfg(feature = "gossipsub")]
pub gossipsub_config: Box<
dyn Fn(
libp2p::gossipsub::ConfigBuilder,
) -> Result<libp2p::gossipsub::Config, libp2p::gossipsub::ConfigBuilderError>,
>,
#[cfg(feature = "floodsub")]
pub floodsub_config: Box<dyn Fn(FloodsubConfig) -> FloodsubConfig>,
#[cfg(feature = "ping")]
pub ping_config: Box<dyn Fn(PingConfig) -> PingConfig>,
#[cfg(feature = "autonat")]
pub autonat_v1_config: Box<dyn Fn(AutonatV1Config) -> AutonatV1Config>,
#[cfg(feature = "autonat")]
pub autonat_v2_client_config: Box<dyn Fn(AutonatV2ClientConfig) -> AutonatV2ClientConfig>,
#[cfg(feature = "relay")]
pub relay_server_config: Box<dyn Fn(RelayServerConfig) -> RelayServerConfig>,
#[cfg(feature = "identify")]
pub identify_config: (String, Box<dyn Fn(IdentifyConfig) -> IdentifyConfig>),
#[cfg(feature = "request-response")]
pub request_response_config: Vec<RequestResponseConfig>,
pub connection_limits: Box<dyn Fn(ConnectionLimits) -> ConnectionLimits>,
}
impl Default for Config {
fn default() -> Self {
Self {
#[cfg(feature = "kad")]
kademlia_config: ("/ipfs/kad/1.0.0".to_string(), Box::new(|config| config)),
#[cfg(feature = "gossipsub")]
gossipsub_config: Box::new(|config| config.build()),
#[cfg(feature = "floodsub")]
floodsub_config: Box::new(|config| config),
#[cfg(feature = "ping")]
ping_config: Box::new(|config| config),
#[cfg(feature = "autonat")]
autonat_v1_config: Box::new(|config| config),
#[cfg(feature = "autonat")]
autonat_v2_client_config: Box::new(|config| config),
#[cfg(feature = "relay")]
relay_server_config: Box::new(|config| config),
#[cfg(feature = "identify")]
identify_config: (String::from("/ipfs/id"), Box::new(|config| config)),
#[cfg(feature = "request-response")]
request_response_config: vec![],
connection_limits: Box::new(|config| config),
}
}
}
#[derive(Default)]
pub(crate) struct Protocols {
#[cfg(feature = "gossipsub")]
pub(crate) gossipsub: bool,
#[cfg(feature = "floodsub")]
pub(crate) floodsub: bool,
#[cfg(feature = "kad")]
pub(crate) kad: bool,
#[cfg(feature = "relay")]
pub(crate) relay_client: bool,
#[cfg(feature = "relay")]
pub(crate) relay_server: bool,
#[cfg(all(feature = "dcutr", feature = "dcutr"))]
pub(crate) dcutr: bool,
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "mdns")]
pub(crate) mdns: bool,
#[cfg(feature = "identify")]
pub(crate) identify: bool,
#[cfg(feature = "autonat")]
pub(crate) autonat_v1: bool,
#[cfg(feature = "autonat")]
pub(crate) autonat_v2_client: bool,
#[cfg(feature = "autonat")]
pub(crate) autonat_v2_server: bool,
#[cfg(feature = "rendezvous")]
pub(crate) rendezvous_client: bool,
#[cfg(feature = "rendezvous")]
pub(crate) rendezvous_server: bool,
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "upnp")]
pub(crate) upnp: bool,
#[cfg(feature = "ping")]
pub(crate) ping: bool,
#[cfg(feature = "stream")]
pub(crate) streams: bool,
#[cfg(feature = "request-response")]
pub(crate) request_response: bool,
pub(crate) connection_limits: bool,
}
impl<X, C, T> ConnexaBuilder<X, C, T>
where
C: NetworkBehaviour,
C: Send,
C::ToSwarm: Debug,
T: Send + Sync + 'static,
X: Default + Unpin + Send + Sync + 'static,
{
pub fn new_identity() -> Self {
let keypair = Keypair::generate_ed25519();
Self::with_existing_identity(&keypair)
}
pub fn with_existing_identity(keypair: &Keypair) -> Self {
let keypair = keypair.clone();
Self {
keypair,
custom_behaviour: None,
context: X::default(),
file_descriptor_limits: None,
custom_task_callback: Box::new(|_, _, _| ()),
custom_event_callback: Box::new(|_, _, _| ()),
swarm_event_callback: Box::new(|_| ()),
config: Config::default(),
protocols: Protocols::default(),
swarm_config: Box::new(|config| config),
transport_config: TransportConfig::default(),
}
}
pub fn set_swarm_config<F>(mut self, f: F) -> Self
where
F: Fn(libp2p::swarm::Config) -> libp2p::swarm::Config + 'static,
{
self.swarm_config = Box::new(f);
self
}
pub fn set_file_descriptor_limit(mut self, limit: FileDescLimit) -> Self {
self.file_descriptor_limits = Some(limit);
self
}
pub fn set_custom_task_callback<F>(mut self, f: F) -> Self
where
F: Fn(&mut Swarm<behaviour::Behaviour<C>>, &mut X, T) + 'static + Send,
{
self.custom_task_callback = Box::new(f);
self
}
pub fn set_custom_event_callback<F>(mut self, f: F) -> Self
where
F: Fn(&mut Swarm<behaviour::Behaviour<C>>, &mut X, C::ToSwarm) + 'static + Send,
{
self.custom_event_callback = Box::new(f);
self
}
pub fn set_swarm_event_callback<F>(mut self, f: F) -> Self
where
F: Fn(&SwarmEvent<behaviour::BehaviourEvent<C>>) + 'static + Send,
{
self.swarm_event_callback = Box::new(f);
self
}
pub fn set_context(mut self, context: X) -> Self {
self.context = context;
self
}
#[cfg(feature = "kad")]
pub fn with_kademlia(self) -> Self {
self.with_kademlia_with_config("/ipfs/kad/1.0.0", |config| config)
}
#[cfg(feature = "kad")]
pub fn with_kademlia_with_config<F>(mut self, protocol: impl Into<String>, f: F) -> Self
where
F: Fn(KadConfig) -> KadConfig + 'static,
{
self.protocols.kad = true;
self.config.kademlia_config = (protocol.into(), Box::new(f));
self
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "mdns")]
pub fn with_mdns(mut self) -> Self {
self.protocols.mdns = true;
self
}
#[cfg(feature = "relay")]
pub fn with_relay(mut self) -> Self {
self.protocols.relay_client = true;
self
}
#[cfg(all(feature = "relay", feature = "dcutr"))]
pub fn with_dcutr(mut self) -> Self {
self.protocols.dcutr = true;
self
}
#[cfg(feature = "relay")]
pub fn with_relay_server(self) -> Self {
self.with_relay_server_with_config(|config| config)
}
#[cfg(feature = "relay")]
pub fn with_relay_server_with_config<F>(mut self, config: F) -> Self
where
F: Fn(RelayServerConfig) -> RelayServerConfig + 'static,
{
self.protocols.relay_server = true;
self.config.relay_server_config = Box::new(config);
self
}
#[cfg(not(target_arch = "wasm32"))]
#[cfg(feature = "upnp")]
pub fn with_upnp(mut self) -> Self {
self.protocols.upnp = true;
self
}
#[cfg(feature = "rendezvous")]
pub fn with_rendezvous_server(mut self) -> Self {
self.protocols.rendezvous_server = true;
self
}
#[cfg(feature = "rendezvous")]
pub fn with_rendezvous_client(mut self) -> Self {
self.protocols.rendezvous_client = true;
self
}
#[cfg(feature = "identify")]
pub fn with_identify(self) -> Self {
self.with_identify_with_config("/ipfs/id", |config| config)
}
#[cfg(feature = "identify")]
pub fn with_identify_with_config<F>(mut self, protocol: impl Into<String>, config: F) -> Self
where
F: Fn(IdentifyConfig) -> IdentifyConfig + 'static,
{
let protocol = protocol.into();
self.protocols.identify = true;
self.config.identify_config = (protocol, Box::new(config));
self
}
#[cfg(feature = "stream")]
pub fn with_streams(mut self) -> Self {
self.protocols.streams = true;
self
}
#[cfg(feature = "gossipsub")]
pub fn with_gossipsub(self) -> Self {
self.with_gossipsub_with_config(|config| config.build())
}
#[cfg(feature = "gossipsub")]
pub fn with_gossipsub_with_config<F>(mut self, config: F) -> Self
where
F: Fn(
libp2p::gossipsub::ConfigBuilder,
)
-> Result<libp2p::gossipsub::Config, libp2p::gossipsub::ConfigBuilderError>
+ 'static,
{
self.protocols.gossipsub = true;
self.config.gossipsub_config = Box::new(config);
self
}
#[cfg(feature = "floodsub")]
pub fn with_floodsub(self) -> Self {
self.with_floodsub_with_config(|config| config)
}
#[cfg(feature = "floodsub")]
pub fn with_floodsub_with_config<F>(mut self, config: F) -> Self
where
F: Fn(FloodsubConfig) -> FloodsubConfig + 'static,
{
self.protocols.floodsub = true;
self.config.floodsub_config = Box::new(config);
self
}
#[cfg(feature = "request-response")]
pub fn with_request_response(mut self, mut config: Vec<RequestResponseConfig>) -> Self {
if config.len() > 10 {
config.truncate(10);
}
self.protocols.request_response = true;
if config.is_empty() {
config.push(RequestResponseConfig::default());
}
self.config.request_response_config = config;
self
}
#[cfg(feature = "autonat")]
pub fn with_autonat_v1(self) -> Self {
self.with_autonat_v1_with_config(|config| config)
}
#[cfg(feature = "autonat")]
pub fn with_autonat_v1_with_config<F>(mut self, config: F) -> Self
where
F: Fn(AutonatV1Config) -> AutonatV1Config + 'static,
{
self.protocols.autonat_v1 = true;
self.config.autonat_v1_config = Box::new(config);
self
}
#[cfg(feature = "autonat")]
pub fn with_autonat_v2_client(self) -> Self {
self.with_autonat_v2_client_with_config(|config| config)
}
#[cfg(feature = "autonat")]
pub fn with_autonat_v2_client_with_config<F>(mut self, config: F) -> Self
where
F: Fn(AutonatV2ClientConfig) -> AutonatV2ClientConfig + 'static,
{
self.protocols.autonat_v2_client = true;
self.config.autonat_v2_client_config = Box::new(config);
self
}
#[cfg(feature = "autonat")]
pub fn with_autonat_v2_server(mut self) -> Self {
self.protocols.autonat_v2_server = true;
self
}
#[cfg(feature = "ping")]
pub fn with_ping(self) -> Self {
self.with_ping_with_config(|config| config)
}
#[cfg(feature = "ping")]
pub fn with_ping_with_config<F>(mut self, config: F) -> Self
where
F: Fn(PingConfig) -> PingConfig + 'static,
{
self.protocols.ping = true;
self.config.ping_config = Box::new(config);
self
}
pub fn with_custom_behaviour<F>(mut self, behaviour: F) -> Self
where
F: Fn(Keypair) -> C + 'static,
{
self.custom_behaviour = Some(Box::new(behaviour));
self
}
#[cfg(feature = "quic")]
pub fn enable_quic(self) -> Self {
self.enable_quic_with_config(|config| {
config.keep_alive_interval = Duration::from_millis(100);
config.max_idle_timeout = 300;
})
}
#[cfg(feature = "quic")]
pub fn enable_quic_with_config<F>(mut self, f: F) -> Self
where
F: FnMut(&mut libp2p::quic::Config) + 'static,
{
let callback = Box::new(f);
self.transport_config.quic_config_callback = callback;
self.transport_config.enable_quic = true;
self
}
#[cfg(feature = "tcp")]
pub fn enable_tcp(self) -> Self {
self.enable_tcp_with_config(|config| config.nodelay(true))
}
#[cfg(feature = "tcp")]
pub fn enable_tcp_with_config<F>(mut self, f: F) -> Self
where
F: FnOnce(libp2p::tcp::Config) -> libp2p::tcp::Config + 'static,
{
let callback = Box::new(f);
self.transport_config.tcp_config_callback = callback;
self.transport_config.enable_tcp = true;
self
}
#[cfg(feature = "pnet")]
pub fn enable_pnet(mut self, psk: PreSharedKey) -> Self {
self.transport_config.enable_pnet = true;
self.transport_config.pnet_psk = Some(psk);
self
}
#[cfg(feature = "websocket")]
pub fn enable_websocket(mut self) -> Self {
self.transport_config.enable_websocket = true;
self
}
#[cfg(feature = "websocket")]
pub fn enable_secure_websocket(mut self, pem: Option<(Vec<String>, String)>) -> Self {
self.transport_config.enable_secure_websocket = true;
self.transport_config.enable_websocket = true;
self.transport_config.websocket_pem = pem;
self
}
#[cfg(feature = "dns")]
pub fn enable_dns(self) -> Self {
self.enable_dns_with_resolver(DnsResolver::default())
}
#[cfg(feature = "dns")]
pub fn enable_dns_with_resolver(mut self, resolver: DnsResolver) -> Self {
self.transport_config.dns_resolver = Some(resolver);
self.transport_config.enable_dns = true;
self
}
pub fn enable_memory_transport(mut self) -> Self {
self.transport_config.enable_memory_transport = true;
self
}
pub fn start(self) -> std::io::Result<Connexa<T>> {
let ConnexaBuilder {
keypair,
context,
custom_behaviour,
file_descriptor_limits,
custom_task_callback,
custom_event_callback,
swarm_event_callback,
config,
protocols,
swarm_config,
transport_config,
} = self;
let span = Span::current();
if let Some(limit) = file_descriptor_limits {
#[cfg(unix)]
{
let (_, hard) = rlimit::Resource::NOFILE.get()?;
let limit = match limit {
FileDescLimit::Max => hard,
FileDescLimit::Custom(limit) => limit,
};
let target = std::cmp::min(hard, limit);
rlimit::Resource::NOFILE.set(target, hard)?;
let (soft, _) = rlimit::Resource::NOFILE.get()?;
if soft < 2048 {
tracing::warn!("Limit is too low: {soft}");
}
}
#[cfg(not(unix))]
{
tracing::warn!(
?limit,
"fd limit can only be set on unix systems. Ignoring..."
)
}
}
let peer_id = keypair.public().to_peer_id();
let swarm_config = swarm_config(libp2p::swarm::Config::with_executor(ConnexaExecutor));
let custom_behaviour = custom_behaviour.map(|custom_fn| custom_fn(keypair.clone()));
let (behaviour, relay_transport) =
behaviour::Behaviour::new(&keypair, custom_behaviour, config, protocols)?;
let transport =
transport::build_transport(keypair.clone(), relay_transport, transport_config)?;
let swarm = Swarm::new(transport, behaviour, peer_id, swarm_config);
let connexa_task = ConnexaTask::new(swarm);
let to_task = async_rt::task::spawn_coroutine_with_context(
(
context,
custom_task_callback,
custom_event_callback,
swarm_event_callback,
connexa_task,
),
|(context, tcb, ecb, scb, mut ctx), rx| async move {
ctx.set_context(context);
ctx.set_task_callback(tcb);
ctx.set_event_callback(ecb);
ctx.set_command_receiver(rx);
ctx.set_swarm_event_callback(scb);
ctx.await
},
);
let connexa = Connexa::new(span, keypair, to_task);
Ok(connexa)
}
}