use super::{addressbook, protocol};
use super::peerbook::{self};
use serde::{Deserialize, Serialize};
use crate::repo::DefaultStorage;
use crate::{IntoAddPeerOpt, IpfsOptions};
use crate::repo::Repo;
use ipld_core::cid::Cid;
use connexa::prelude::dht::Record;
use connexa::prelude::identity::{Keypair, PublicKey};
use connexa::prelude::swarm::behaviour::toggle::Toggle;
use connexa::prelude::swarm::NetworkBehaviour;
use connexa::prelude::{identify, relay, Multiaddr, PeerId};
use std::fmt::Debug;
use std::num::NonZeroU32;
use std::time::Duration;
#[derive(NetworkBehaviour)]
#[behaviour(prelude = "connexa::prelude::swarm::derive_prelude")]
pub struct Behaviour<C>
where
C: NetworkBehaviour,
<C as NetworkBehaviour>::ToSwarm: Debug + Send,
{
pub addressbook: addressbook::Behaviour,
pub relay_manager: Toggle<libp2p_relay_manager::Behaviour>,
pub bitswap: Toggle<super::bitswap::Behaviour>,
pub custom: Toggle<C>,
pub peerbook: peerbook::Behaviour,
pub protocol: protocol::Behaviour,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum KadResult {
Complete,
Peers(Vec<PeerId>),
Records(Vec<Record>),
Record(Record),
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RelayConfig {
pub max_reservations: usize,
pub max_reservations_per_peer: usize,
pub reservation_duration: Duration,
pub reservation_rate_limiters: Vec<RateLimit>,
pub max_circuits: usize,
pub max_circuits_per_peer: usize,
pub max_circuit_duration: Duration,
pub max_circuit_bytes: u64,
pub circuit_src_rate_limiters: Vec<RateLimit>,
}
impl Default for RelayConfig {
fn default() -> Self {
let limiters = vec![
RateLimit::PerPeer {
limit: NonZeroU32::new(30).expect("30 > 0"),
interval: Duration::from_secs(60 * 2),
},
RateLimit::PerIp {
limit: NonZeroU32::new(60).expect("60 > 0"),
interval: Duration::from_secs(60),
},
];
Self {
max_reservations: 128,
max_reservations_per_peer: 4,
reservation_duration: Duration::from_secs(60 * 60),
reservation_rate_limiters: limiters.clone(),
max_circuits: 16,
max_circuits_per_peer: 4,
max_circuit_duration: Duration::from_secs(2 * 60),
max_circuit_bytes: 1 << 17,
circuit_src_rate_limiters: limiters,
}
}
}
impl RelayConfig {
pub fn unbounded() -> Self {
Self {
max_circuits: usize::MAX,
max_circuit_bytes: u64::MAX,
max_circuit_duration: Duration::MAX,
max_circuits_per_peer: usize::MAX,
max_reservations: usize::MAX,
reservation_duration: Duration::MAX,
max_reservations_per_peer: usize::MAX,
reservation_rate_limiters: vec![],
circuit_src_rate_limiters: vec![],
}
}
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct IdentifyConfiguration {
pub protocol_version: String,
pub agent_version: String,
pub interval: Duration,
pub push_update: bool,
pub cache: usize,
}
impl Default for IdentifyConfiguration {
fn default() -> Self {
Self {
protocol_version: "/ipfs/0.1.0".into(),
agent_version: "rust-ipfs".into(),
interval: Duration::from_secs(5 * 60),
push_update: true,
cache: 100,
}
}
}
impl IdentifyConfiguration {
pub fn into(self, publuc_key: PublicKey) -> identify::Config {
identify::Config::new(self.protocol_version, publuc_key)
.with_agent_version(self.agent_version)
.with_interval(self.interval)
.with_push_listen_addr_updates(self.push_update)
.with_cache_size(self.cache)
}
}
impl From<RelayConfig> for relay::server::Config {
fn from(
RelayConfig {
max_reservations,
max_reservations_per_peer,
reservation_duration,
max_circuits,
max_circuits_per_peer,
max_circuit_duration,
max_circuit_bytes,
reservation_rate_limiters,
circuit_src_rate_limiters,
}: RelayConfig,
) -> Self {
let reservation_duration = max_duration(reservation_duration);
let max_circuit_duration = max_duration(max_circuit_duration);
let mut config = relay::server::Config {
max_reservations,
max_reservations_per_peer,
reservation_duration,
max_circuits,
max_circuits_per_peer,
max_circuit_duration,
max_circuit_bytes,
..Default::default()
};
for rate in circuit_src_rate_limiters {
match rate {
RateLimit::PerPeer { limit, interval } => {
config = config.circuit_src_per_peer(limit, interval);
}
RateLimit::PerIp { limit, interval } => {
config = config.circuit_src_per_ip(limit, interval);
}
}
}
for rate in reservation_rate_limiters {
match rate {
RateLimit::PerPeer { limit, interval } => {
config = config.reservation_rate_per_peer(limit, interval);
}
RateLimit::PerIp { limit, interval } => {
config = config.reservation_rate_per_ip(limit, interval);
}
}
}
config
}
}
fn max_duration(duration: Duration) -> Duration {
let start = web_time::Instant::now();
if start.checked_add(duration).is_none() {
return Duration::from_secs(u32::MAX as _);
}
duration
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub enum RateLimit {
PerPeer {
limit: NonZeroU32,
interval: Duration,
},
PerIp {
limit: NonZeroU32,
interval: Duration,
},
}
impl<C> Behaviour<C>
where
C: NetworkBehaviour,
<C as NetworkBehaviour>::ToSwarm: Debug + Send,
{
pub(crate) fn new(
keypair: &Keypair,
options: &IpfsOptions,
repo: &Repo<DefaultStorage>,
custom: Option<C>,
) -> Self {
let bootstrap = options.bootstrap.clone();
let protocols = options.protocols;
let peer_id = keypair.public().to_peer_id();
info!("net: starting with peer id {}", peer_id);
let bitswap = protocols
.bitswap
.then(|| super::bitswap::Behaviour::new(repo))
.into();
let relay_manager = protocols
.relay
.then(|| libp2p_relay_manager::Behaviour::default())
.into();
let peerbook = peerbook::Behaviour::default();
let addressbook = addressbook::Behaviour::with_config(options.addr_config);
let protocol = protocol::Behaviour::default();
let custom = Toggle::from(custom);
let mut behaviour = Behaviour {
bitswap,
relay_manager,
peerbook,
addressbook,
protocol,
custom,
};
for addr in bootstrap {
let Ok(mut opt) = IntoAddPeerOpt::into_opt(addr) else {
continue;
};
opt = opt.set_dial(true);
_ = behaviour.add_peer(opt);
}
behaviour
}
pub fn add_peer<I: IntoAddPeerOpt>(&mut self, opt: I) -> bool {
let opt = opt.into_opt().expect("valid entries");
self.addressbook.add_address(opt);
true
}
pub fn remove_peer(&mut self, peer: &PeerId) {
self.addressbook.remove_peer(peer);
}
pub fn addrs(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
self.peerbook.connected_peers_addrs().collect()
}
pub fn stop_providing_block(&mut self, cid: &Cid) {
info!("Finished providing block {}", cid.to_string());
let _key = cid.hash().to_bytes();
}
pub fn supported_protocols(&self) -> Vec<String> {
self.protocol.iter().collect::<Vec<_>>()
}
}