rust_ipfs/p2p/
mod.rs

1//! P2P handling for IPFS nodes.
2use crate::error::Error;
3use crate::repo::Repo;
4use crate::rt::{Executor, ExecutorSwitch};
5use crate::{IpfsOptions, TTransportFn};
6use std::convert::TryInto;
7use std::num::{NonZeroU8, NonZeroUsize};
8use std::time::Duration;
9
10use libp2p::gossipsub::ValidationMode;
11use libp2p::identify::Info as IdentifyInfo;
12use libp2p::identity::{Keypair, PublicKey};
13use libp2p::request_response::ProtocolSupport;
14use libp2p::swarm::NetworkBehaviour;
15use libp2p::{Multiaddr, PeerId};
16use libp2p::{StreamProtocol, Swarm};
17use tracing::Span;
18
19pub(crate) mod addr;
20pub(crate) mod addressbook;
21pub mod bitswap;
22pub(crate) mod peerbook;
23pub mod protocol;
24pub(crate) mod rr_man;
25
26mod behaviour;
27pub use self::addressbook::Config as AddressBookConfig;
28pub use self::behaviour::BehaviourEvent;
29pub use self::behaviour::IdentifyConfiguration;
30
31pub use self::behaviour::{KadConfig, KadInserts, KadStoreConfig};
32pub use self::behaviour::{RateLimit, RelayConfig};
33#[cfg(not(target_arch = "wasm32"))]
34pub use self::transport::generate_cert;
35pub use self::transport::{DnsResolver, TransportConfig, UpgradeVersion};
36pub(crate) mod gossipsub;
37mod request_response;
38mod transport;
39
40pub use addr::MultiaddrExt;
41pub use behaviour::KadResult;
42
43pub(crate) type TSwarm<C> = Swarm<behaviour::Behaviour<C>>;
44
45/// Abstraction of IdentifyInfo but includes PeerId
46#[derive(Clone, Debug, Eq)]
47pub struct PeerInfo {
48    /// The peer id of the user
49    pub peer_id: PeerId,
50
51    /// The public key of the local peer.
52    pub public_key: PublicKey,
53
54    /// Application-specific version of the protocol family used by the peer,
55    /// e.g. `ipfs/1.0.0` or `polkadot/1.0.0`.
56    pub protocol_version: String,
57
58    /// Name and version of the peer, similar to the `User-Agent` header in
59    /// the HTTP protocol.
60    pub agent_version: String,
61
62    /// The addresses that the peer is listening on.
63    pub listen_addrs: Vec<Multiaddr>,
64
65    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
66    pub protocols: Vec<StreamProtocol>,
67
68    /// Address observed by or for the remote.
69    pub observed_addr: Option<Multiaddr>,
70}
71
72impl core::hash::Hash for PeerInfo {
73    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
74        self.peer_id.hash(state);
75        self.public_key.hash(state);
76    }
77}
78
79impl PartialEq for PeerInfo {
80    fn eq(&self, other: &Self) -> bool {
81        self.peer_id == other.peer_id && self.public_key == other.public_key
82    }
83}
84
85impl From<IdentifyInfo> for PeerInfo {
86    fn from(info: IdentifyInfo) -> Self {
87        let IdentifyInfo {
88            public_key,
89            protocol_version,
90            agent_version,
91            listen_addrs,
92            protocols,
93            observed_addr,
94        } = info;
95        let peer_id = public_key.to_peer_id();
96        let observed_addr = Some(observed_addr);
97        Self {
98            peer_id,
99            public_key,
100            protocol_version,
101            agent_version,
102            listen_addrs,
103            protocols,
104            observed_addr,
105        }
106    }
107}
108
109#[derive(Debug, Clone, PartialEq, Eq)]
110pub struct PubsubConfig {
111    /// Custom protocol name
112    pub custom_protocol_id: Option<String>,
113
114    /// Max size that can be transmitted over gossipsub
115    pub max_transmit_size: usize,
116
117    /// Floodsub compatibility
118    pub floodsub_compat: bool,
119
120    /// Validation
121    pub validate: PubsubValidation,
122}
123
124#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
125pub enum PubsubValidation {
126    /// See [`ValidationMode::Strict`]
127    Strict,
128
129    /// See [`ValidationMode::Permissive`]
130    Permissive,
131
132    /// See [`ValidationMode::Anonymous`]
133    Anonymous,
134
135    /// See [`ValidationMode::None`]
136    Relaxed,
137}
138
139impl From<PubsubValidation> for ValidationMode {
140    fn from(validation: PubsubValidation) -> Self {
141        match validation {
142            PubsubValidation::Strict => ValidationMode::Strict,
143            PubsubValidation::Permissive => ValidationMode::Permissive,
144            PubsubValidation::Anonymous => ValidationMode::Anonymous,
145            PubsubValidation::Relaxed => ValidationMode::None,
146        }
147    }
148}
149
150impl Default for PubsubConfig {
151    fn default() -> Self {
152        Self {
153            custom_protocol_id: None,
154            max_transmit_size: 2 * 1024 * 1024,
155            validate: PubsubValidation::Strict,
156            floodsub_compat: false,
157        }
158    }
159}
160
161#[derive(Debug, Clone)]
162pub struct RequestResponseConfig {
163    pub protocol: String,
164    pub timeout: Option<Duration>,
165    pub max_request_size: usize,
166    pub max_response_size: usize,
167    pub concurrent_streams: Option<usize>,
168    pub channel_buffer: usize,
169    pub protocol_direction: RequestResponseDirection,
170}
171
172#[derive(Debug, Clone, Default)]
173pub enum RequestResponseDirection {
174    In,
175    Out,
176    #[default]
177    Both,
178}
179
180impl From<RequestResponseDirection> for ProtocolSupport {
181    fn from(direction: RequestResponseDirection) -> Self {
182        match direction {
183            RequestResponseDirection::In => ProtocolSupport::Inbound,
184            RequestResponseDirection::Out => ProtocolSupport::Outbound,
185            RequestResponseDirection::Both => ProtocolSupport::Full,
186        }
187    }
188}
189
190impl Default for RequestResponseConfig {
191    fn default() -> Self {
192        Self {
193            protocol: "/ipfs/request-response".into(),
194            timeout: None,
195            max_request_size: 512 * 1024,
196            max_response_size: 2 * 1024 * 1024,
197            concurrent_streams: None,
198            channel_buffer: 128,
199            protocol_direction: RequestResponseDirection::default(),
200        }
201    }
202}
203
204#[derive(Clone)]
205pub struct SwarmConfig {
206    pub dial_concurrency_factor: NonZeroU8,
207    pub notify_handler_buffer_size: NonZeroUsize,
208    pub connection_event_buffer_size: usize,
209    pub max_inbound_stream: usize,
210}
211
212impl Default for SwarmConfig {
213    fn default() -> Self {
214        Self {
215            dial_concurrency_factor: 8.try_into().expect("8 > 0"),
216            notify_handler_buffer_size: 32.try_into().expect("256 > 0"),
217            connection_event_buffer_size: 7,
218            max_inbound_stream: 10_000,
219        }
220    }
221}
222
223#[allow(clippy::type_complexity)]
224#[allow(deprecated)]
225//TODO: use libp2p::SwarmBuilder
226/// Creates a new IPFS swarm.
227pub(crate) fn create_swarm<C>(
228    keypair: &Keypair,
229    options: &IpfsOptions,
230    executor: ExecutorSwitch,
231    repo: &Repo,
232    span: Span,
233    (custom, custom_transport): (Option<C>, Option<TTransportFn>),
234) -> Result<TSwarm<C>, Error>
235where
236    C: NetworkBehaviour,
237    <C as NetworkBehaviour>::ToSwarm: std::fmt::Debug + Send,
238{
239    let keypair = keypair.clone();
240    let peer_id = keypair.public().to_peer_id();
241
242    let swarm_config = options.swarm_configuration.clone();
243    let transport_config = options.transport_configuration.clone();
244
245    let idle = options.connection_idle;
246
247    let (behaviour, relay_transport) = behaviour::Behaviour::new(&keypair, options, repo, custom)?;
248
249    // Set up an encrypted TCP transport over the Yamux. If relay transport is supplied, that will be apart
250    let transport = match custom_transport {
251        Some(transport) => transport(&keypair, relay_transport)?,
252        None => transport::build_transport(keypair, relay_transport, transport_config)?,
253    };
254
255    let swarm = libp2p::Swarm::new(
256        transport,
257        behaviour,
258        peer_id,
259        libp2p::swarm::Config::with_executor(SpannedExecutor { executor, span })
260            .with_notify_handler_buffer_size(swarm_config.notify_handler_buffer_size)
261            .with_per_connection_event_buffer_size(swarm_config.connection_event_buffer_size)
262            .with_dial_concurrency_factor(swarm_config.dial_concurrency_factor)
263            .with_max_negotiating_inbound_streams(swarm_config.max_inbound_stream)
264            .with_idle_connection_timeout(idle),
265    );
266
267    Ok(swarm)
268}
269
270struct SpannedExecutor {
271    executor: ExecutorSwitch,
272    span: Span,
273}
274
275impl libp2p::swarm::Executor for SpannedExecutor {
276    fn exec(
277        &self,
278        future: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + 'static + Send>>,
279    ) {
280        use tracing_futures::Instrument;
281        self.executor.dispatch(future.instrument(self.span.clone()));
282    }
283}