Skip to main content

rust_ipfs/p2p/
behaviour.rs

1use super::{addressbook, protocol};
2
3use super::peerbook::{self};
4use serde::{Deserialize, Serialize};
5
6use crate::repo::DefaultStorage;
7use crate::{IntoAddPeerOpt, IpfsOptions};
8
9use crate::repo::Repo;
10
11use ipld_core::cid::Cid;
12
13use connexa::prelude::dht::Record;
14use connexa::prelude::identity::{Keypair, PublicKey};
15use connexa::prelude::swarm::behaviour::toggle::Toggle;
16use connexa::prelude::swarm::NetworkBehaviour;
17use connexa::prelude::{identify, relay, Multiaddr, PeerId};
18use std::fmt::Debug;
19use std::num::NonZeroU32;
20use std::time::Duration;
21
22/// Behaviour type.
23#[derive(NetworkBehaviour)]
24#[behaviour(prelude = "connexa::prelude::swarm::derive_prelude")]
25pub struct Behaviour<C>
26where
27    C: NetworkBehaviour,
28    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
29{
30    pub addressbook: addressbook::Behaviour,
31
32    // networking
33    pub relay_manager: Toggle<libp2p_relay_manager::Behaviour>,
34
35    pub bitswap: Toggle<super::bitswap::Behaviour>,
36
37    // custom behaviours
38    pub custom: Toggle<C>,
39
40    // misc
41    pub peerbook: peerbook::Behaviour,
42    pub protocol: protocol::Behaviour,
43}
44
45/// Represents the result of a Kademlia query.
46#[derive(Debug, Clone, PartialEq, Eq)]
47pub enum KadResult {
48    /// The query has been exhausted.
49    Complete,
50    /// The query successfully returns `GetClosestPeers` or `GetProviders` results.
51    Peers(Vec<PeerId>),
52    /// The query successfully returns a `GetRecord` result.
53    Records(Vec<Record>),
54    Record(Record),
55}
56
57#[derive(Serialize, Deserialize, Clone, Debug)]
58pub struct RelayConfig {
59    pub max_reservations: usize,
60    pub max_reservations_per_peer: usize,
61    pub reservation_duration: Duration,
62    pub reservation_rate_limiters: Vec<RateLimit>,
63
64    pub max_circuits: usize,
65    pub max_circuits_per_peer: usize,
66    pub max_circuit_duration: Duration,
67    pub max_circuit_bytes: u64,
68    pub circuit_src_rate_limiters: Vec<RateLimit>,
69}
70
71impl Default for RelayConfig {
72    fn default() -> Self {
73        let limiters = vec![
74            RateLimit::PerPeer {
75                limit: NonZeroU32::new(30).expect("30 > 0"),
76                interval: Duration::from_secs(60 * 2),
77            },
78            RateLimit::PerIp {
79                limit: NonZeroU32::new(60).expect("60 > 0"),
80                interval: Duration::from_secs(60),
81            },
82        ];
83        Self {
84            max_reservations: 128,
85            max_reservations_per_peer: 4,
86            reservation_duration: Duration::from_secs(60 * 60),
87            reservation_rate_limiters: limiters.clone(),
88            max_circuits: 16,
89            max_circuits_per_peer: 4,
90            max_circuit_duration: Duration::from_secs(2 * 60),
91            max_circuit_bytes: 1 << 17,
92            circuit_src_rate_limiters: limiters,
93        }
94    }
95}
96
97impl RelayConfig {
98    /// Configuration to allow a connection to the relay without limits
99    pub fn unbounded() -> Self {
100        Self {
101            max_circuits: usize::MAX,
102            max_circuit_bytes: u64::MAX,
103            max_circuit_duration: Duration::MAX,
104            max_circuits_per_peer: usize::MAX,
105            max_reservations: usize::MAX,
106            reservation_duration: Duration::MAX,
107            max_reservations_per_peer: usize::MAX,
108            reservation_rate_limiters: vec![],
109            circuit_src_rate_limiters: vec![],
110        }
111    }
112}
113
114#[derive(Serialize, Deserialize, Clone, Debug)]
115pub struct IdentifyConfiguration {
116    pub protocol_version: String,
117    pub agent_version: String,
118    pub interval: Duration,
119    pub push_update: bool,
120    pub cache: usize,
121}
122
123impl Default for IdentifyConfiguration {
124    fn default() -> Self {
125        Self {
126            protocol_version: "/ipfs/0.1.0".into(),
127            agent_version: "rust-ipfs".into(),
128            interval: Duration::from_secs(5 * 60),
129            push_update: true,
130            cache: 100,
131        }
132    }
133}
134
135impl IdentifyConfiguration {
136    pub fn into(self, publuc_key: PublicKey) -> identify::Config {
137        identify::Config::new(self.protocol_version, publuc_key)
138            .with_agent_version(self.agent_version)
139            .with_interval(self.interval)
140            .with_push_listen_addr_updates(self.push_update)
141            .with_cache_size(self.cache)
142    }
143}
144
145impl From<RelayConfig> for relay::server::Config {
146    fn from(
147        RelayConfig {
148            max_reservations,
149            max_reservations_per_peer,
150            reservation_duration,
151            max_circuits,
152            max_circuits_per_peer,
153            max_circuit_duration,
154            max_circuit_bytes,
155            reservation_rate_limiters,
156            circuit_src_rate_limiters,
157        }: RelayConfig,
158    ) -> Self {
159        let reservation_duration = max_duration(reservation_duration);
160        let max_circuit_duration = max_duration(max_circuit_duration);
161
162        let mut config = relay::server::Config {
163            max_reservations,
164            max_reservations_per_peer,
165            reservation_duration,
166            max_circuits,
167            max_circuits_per_peer,
168            max_circuit_duration,
169            max_circuit_bytes,
170            ..Default::default()
171        };
172
173        for rate in circuit_src_rate_limiters {
174            match rate {
175                RateLimit::PerPeer { limit, interval } => {
176                    config = config.circuit_src_per_peer(limit, interval);
177                }
178                RateLimit::PerIp { limit, interval } => {
179                    config = config.circuit_src_per_ip(limit, interval);
180                }
181            }
182        }
183
184        for rate in reservation_rate_limiters {
185            match rate {
186                RateLimit::PerPeer { limit, interval } => {
187                    config = config.reservation_rate_per_peer(limit, interval);
188                }
189                RateLimit::PerIp { limit, interval } => {
190                    config = config.reservation_rate_per_ip(limit, interval);
191                }
192            }
193        }
194
195        config
196    }
197}
198
199fn max_duration(duration: Duration) -> Duration {
200    let start = web_time::Instant::now();
201    if start.checked_add(duration).is_none() {
202        return Duration::from_secs(u32::MAX as _);
203    }
204    duration
205}
206
207#[derive(Serialize, Deserialize, Clone, Debug)]
208pub enum RateLimit {
209    PerPeer {
210        limit: NonZeroU32,
211        interval: Duration,
212    },
213    PerIp {
214        limit: NonZeroU32,
215        interval: Duration,
216    },
217}
218impl<C> Behaviour<C>
219where
220    C: NetworkBehaviour,
221    <C as NetworkBehaviour>::ToSwarm: Debug + Send,
222{
223    pub(crate) fn new(
224        keypair: &Keypair,
225        options: &IpfsOptions,
226        repo: &Repo<DefaultStorage>,
227        custom: Option<C>,
228    ) -> Self {
229        let bootstrap = options.bootstrap.clone();
230
231        let protocols = options.protocols;
232
233        let peer_id = keypair.public().to_peer_id();
234
235        info!("net: starting with peer id {}", peer_id);
236
237        let bitswap = protocols
238            .bitswap
239            .then(|| super::bitswap::Behaviour::new(repo))
240            .into();
241
242        let relay_manager = protocols
243            .relay
244            .then(|| libp2p_relay_manager::Behaviour::default())
245            .into();
246
247        let peerbook = peerbook::Behaviour::default();
248
249        let addressbook = addressbook::Behaviour::with_config(options.addr_config);
250
251        let protocol = protocol::Behaviour::default();
252        let custom = Toggle::from(custom);
253
254        let mut behaviour = Behaviour {
255            bitswap,
256            relay_manager,
257            peerbook,
258            addressbook,
259            protocol,
260            custom,
261        };
262
263        for addr in bootstrap {
264            let Ok(mut opt) = IntoAddPeerOpt::into_opt(addr) else {
265                continue;
266            };
267
268            // explicitly dial the bootstrap peer. If the peer will be bootstrapped via kad, the additional dial will be cancelled
269            opt = opt.set_dial(true);
270
271            _ = behaviour.add_peer(opt);
272        }
273
274        behaviour
275    }
276
277    pub fn add_peer<I: IntoAddPeerOpt>(&mut self, opt: I) -> bool {
278        let opt = opt.into_opt().expect("valid entries");
279
280        self.addressbook.add_address(opt);
281
282        true
283    }
284
285    pub fn remove_peer(&mut self, peer: &PeerId) {
286        self.addressbook.remove_peer(peer);
287    }
288
289    pub fn addrs(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
290        self.peerbook.connected_peers_addrs().collect()
291    }
292
293    // TODO
294    pub fn stop_providing_block(&mut self, cid: &Cid) {
295        info!("Finished providing block {}", cid.to_string());
296        let _key = cid.hash().to_bytes();
297        // if let Some(kad) = self.kademlia.as_mut() {
298        //     kad.stop_providing(&key.into());
299        // }
300    }
301
302    pub fn supported_protocols(&self) -> Vec<String> {
303        self.protocol.iter().collect::<Vec<_>>()
304    }
305}