mod stream;
use std::convert::TryInto;
use std::num::{NonZeroU8, NonZeroUsize};
use crate::error::Error;
use crate::repo::Repo;
use crate::{IpfsOptions, TTransportFn};
use either::Either;
use libp2p::gossipsub::ValidationMode;
use libp2p::identify::Info as IdentifyInfo;
use libp2p::identity::{Keypair, PublicKey};
use libp2p::kad::KademliaConfig;
use libp2p::ping::Config as PingConfig;
use libp2p::swarm::NetworkBehaviour;
use libp2p::Swarm;
use libp2p::{Multiaddr, PeerId};
use tracing::Span;
pub(crate) mod addr;
pub(crate) mod addressbook;
pub(crate) mod peerbook;
mod behaviour;
pub use self::addressbook::Config as AddressBookConfig;
pub use self::behaviour::BehaviourEvent;
pub use self::behaviour::IdentifyConfiguration;
pub use self::behaviour::{BitswapConfig, BitswapProtocol};
pub use self::behaviour::{KadConfig, KadInserts, KadStoreConfig};
pub use self::behaviour::{RateLimit, RelayConfig};
pub use self::peerbook::ConnectionLimits;
pub use self::stream::ProviderStream;
pub use self::stream::RecordStream;
pub use self::transport::{
DnsResolver, MultiPlexOption, TransportConfig, UpdateMode, UpgradeVersion,
};
pub(crate) mod gossipsub;
mod transport;
pub use addr::MultiaddrExt;
pub use behaviour::KadResult;
pub type TSwarm<C> = Swarm<behaviour::Behaviour<C>>;
#[derive(Clone, Debug, Eq)]
pub struct PeerInfo {
pub peer_id: PeerId,
pub public_key: PublicKey,
pub protocol_version: String,
pub agent_version: String,
pub listen_addrs: Vec<Multiaddr>,
pub protocols: Vec<String>,
pub observed_addr: Option<Multiaddr>,
}
impl core::hash::Hash for PeerInfo {
fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
self.peer_id.hash(state);
self.public_key.hash(state);
}
}
impl PartialEq for PeerInfo {
fn eq(&self, other: &Self) -> bool {
self.peer_id == other.peer_id && self.public_key == other.public_key
}
}
impl From<IdentifyInfo> for PeerInfo {
fn from(info: IdentifyInfo) -> Self {
let IdentifyInfo {
public_key,
protocol_version,
agent_version,
listen_addrs,
protocols,
observed_addr,
} = info;
let peer_id = public_key.clone().into();
let observed_addr = Some(observed_addr);
Self {
peer_id,
public_key,
protocol_version,
agent_version,
listen_addrs,
protocols,
observed_addr,
}
}
}
pub struct SwarmOptions {
pub bootstrap: Vec<Multiaddr>,
pub mdns: bool,
pub mdns_ipv6: bool,
pub disable_kad: bool,
pub disable_bitswap: bool,
pub relay_server: bool,
pub relay_server_config: Option<RelayConfig>,
pub kad_config: Option<Either<KadConfig, KademliaConfig>>,
pub ping_config: Option<PingConfig>,
pub bitswap_config: Option<BitswapConfig>,
pub identify_config: Option<IdentifyConfiguration>,
pub kad_store_config: KadStoreConfig,
pub pubsub_config: Option<PubsubConfig>,
pub addrbook_config: Option<AddressBookConfig>,
pub portmapping: bool,
pub keep_alive: bool,
pub relay: bool,
pub dcutr: bool,
}
impl From<&IpfsOptions> for SwarmOptions {
fn from(options: &IpfsOptions) -> Self {
let bootstrap = options.bootstrap.clone();
let mdns = options.mdns;
let mdns_ipv6 = options.mdns_ipv6;
let dcutr = options.dcutr;
let relay_server = options.relay_server;
let relay_server_config = options.relay_server_config.clone();
let relay = options.relay;
let kad_config = options.kad_configuration.clone();
let ping_config = options.ping_configuration.clone();
let kad_store_config = options.kad_store_config.clone();
let disable_kad = options.disable_kad;
let disable_bitswap = options.disable_bitswap;
let bitswap_config = options.bitswap_config.clone();
let keep_alive = options.keep_alive;
let identify_config = options.identify_configuration.clone();
let portmapping = options.port_mapping;
let pubsub_config = options.pubsub_config.clone();
let addrbook_config = options.addr_config;
SwarmOptions {
bootstrap,
mdns,
disable_kad,
disable_bitswap,
bitswap_config,
mdns_ipv6,
relay_server,
relay_server_config,
relay,
dcutr,
kad_config,
kad_store_config,
ping_config,
keep_alive,
identify_config,
portmapping,
addrbook_config,
pubsub_config,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PubsubConfig {
pub custom_protocol_id: Option<String>,
pub max_transmit_size: usize,
pub floodsub_compat: bool,
pub validate: PubsubValidation,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PubsubValidation {
Strict,
Permissive,
Anonymous,
Relaxed,
}
impl From<PubsubValidation> for ValidationMode {
fn from(validation: PubsubValidation) -> Self {
match validation {
PubsubValidation::Strict => ValidationMode::Strict,
PubsubValidation::Permissive => ValidationMode::Permissive,
PubsubValidation::Anonymous => ValidationMode::Anonymous,
PubsubValidation::Relaxed => ValidationMode::None,
}
}
}
impl Default for PubsubConfig {
fn default() -> Self {
Self {
custom_protocol_id: None,
max_transmit_size: 2 * 1024 * 1024,
validate: PubsubValidation::Strict,
floodsub_compat: false,
}
}
}
#[derive(Clone)]
pub struct SwarmConfig {
pub connection: ConnectionLimits,
pub dial_concurrency_factor: NonZeroU8,
pub notify_handler_buffer_size: NonZeroUsize,
pub connection_event_buffer_size: usize,
pub max_inbound_stream: usize,
}
impl Default for SwarmConfig {
fn default() -> Self {
Self {
connection: ConnectionLimits::default(),
dial_concurrency_factor: 8.try_into().expect("8 > 0"),
notify_handler_buffer_size: 256.try_into().expect("256 > 0"),
connection_event_buffer_size: 256,
max_inbound_stream: 128,
}
}
}
#[allow(clippy::type_complexity)]
pub async fn create_swarm<C: NetworkBehaviour<OutEvent = void::Void>>(
keypair: &Keypair,
options: SwarmOptions,
swarm_config: SwarmConfig,
transport_config: TransportConfig,
repo: Repo,
span: Span,
(custom, custom_transport): (Option<C>, Option<TTransportFn>),
) -> Result<TSwarm<C>, Error> {
let keypair = keypair.clone();
let peer_id = keypair.public().to_peer_id();
let (behaviour, relay_transport) =
behaviour::build_behaviour(&keypair, options, repo, swarm_config.connection, custom)
.await?;
let transport = match custom_transport {
Some(transport) => transport(&keypair, relay_transport)?,
None => transport::build_transport(keypair, relay_transport, transport_config)?,
};
let swarm = libp2p::swarm::SwarmBuilder::with_executor(
transport,
behaviour,
peer_id,
SpannedExecutor(span),
)
.notify_handler_buffer_size(swarm_config.notify_handler_buffer_size)
.per_connection_event_buffer_size(swarm_config.connection_event_buffer_size)
.dial_concurrency_factor(swarm_config.dial_concurrency_factor)
.max_negotiating_inbound_streams(swarm_config.max_inbound_stream)
.build();
Ok(swarm)
}
struct SpannedExecutor(Span);
impl libp2p::swarm::Executor for SpannedExecutor {
fn exec(
&self,
future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
) {
use tracing_futures::Instrument;
tokio::task::spawn(future.instrument(self.0.clone()));
}
}