rust_ipfs/p2p/
transport.rs

1#[cfg(not(target_arch = "wasm32"))]
2mod misc;
3
4#[cfg(not(target_arch = "wasm32"))]
5pub use misc::generate_cert;
6
7#[allow(unused_imports)]
8use either::Either;
9#[allow(unused_imports)]
10use futures::future::Either as FutureEither;
11use libp2p::core::muxing::StreamMuxerBox;
12#[allow(unused_imports)]
13use libp2p::core::transport::timeout::TransportTimeout;
14use libp2p::core::transport::upgrade::Version;
15use libp2p::core::transport::{Boxed, MemoryTransport, OrTransport};
16#[cfg(not(target_arch = "wasm32"))]
17use libp2p::dns::{ResolverConfig, ResolverOpts};
18use libp2p::relay::client::Transport as ClientTransport;
19use libp2p::yamux::Config as YamuxConfig;
20use libp2p::{identity, noise};
21use libp2p::{PeerId, Transport};
22use std::io;
23use std::time::Duration;
24
25/// Transport type.
26pub(crate) type TTransport = Boxed<(PeerId, StreamMuxerBox)>;
27
28#[derive(Debug, Clone)]
29pub struct TransportConfig {
30    pub timeout: Duration,
31    pub dns_resolver: Option<DnsResolver>,
32    pub version: UpgradeVersion,
33    pub enable_quic: bool,
34    pub quic_max_idle_timeout: Duration,
35    pub quic_keep_alive: Option<Duration>,
36    pub enable_websocket: bool,
37    pub enable_dns: bool,
38    pub enable_memory_transport: bool,
39    pub enable_webtransport: bool,
40    pub websocket_pem: Option<(Vec<String>, String)>,
41    pub enable_secure_websocket: bool,
42    pub support_quic_draft_29: bool,
43    pub enable_webrtc: bool,
44    pub webrtc_pem: Option<String>,
45}
46
47impl Default for TransportConfig {
48    fn default() -> Self {
49        Self {
50            enable_quic: true,
51            enable_websocket: false,
52            websocket_pem: None,
53            enable_secure_websocket: true,
54            enable_memory_transport: false,
55            support_quic_draft_29: false,
56            enable_dns: true,
57            enable_webtransport: false,
58            enable_webrtc: false,
59            webrtc_pem: None,
60            timeout: Duration::from_secs(10),
61            //Note: This is set low due to quic transport not properly resetting connection state when reconnecting before connection timeout
62            //      While in smaller settings this would be alright, we should be cautious of this setting for nodes with larger connections
63            //      since this may increase cpu and network usage.
64            //      see https://github.com/libp2p/rust-libp2p/issues/5097
65            quic_max_idle_timeout: Duration::from_millis(300),
66            quic_keep_alive: Some(Duration::from_millis(100)),
67            dns_resolver: None,
68            version: UpgradeVersion::default(),
69        }
70    }
71}
72
73#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
74pub enum DnsResolver {
75    /// Google DNS Resolver
76    Google,
77    /// Cloudflare DNS Resolver
78    #[default]
79    Cloudflare,
80    /// Local DNS Resolver
81    Local,
82    /// No DNS Resolver
83    None,
84}
85
86#[cfg(not(target_arch = "wasm32"))]
87impl From<DnsResolver> for (ResolverConfig, ResolverOpts) {
88    fn from(value: DnsResolver) -> Self {
89        match value {
90            DnsResolver::Google => (ResolverConfig::google(), Default::default()),
91            DnsResolver::Cloudflare => (ResolverConfig::cloudflare(), Default::default()),
92            DnsResolver::Local => {
93                hickory_resolver::system_conf::read_system_conf().unwrap_or_default()
94            }
95            DnsResolver::None => (ResolverConfig::new(), Default::default()),
96        }
97    }
98}
99
100#[derive(Default, Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
101pub enum UpgradeVersion {
102    /// See [`Version::V1`]
103    Standard,
104
105    /// See [`Version::V1Lazy`]
106    #[default]
107    Lazy,
108}
109
110impl From<UpgradeVersion> for Version {
111    fn from(value: UpgradeVersion) -> Self {
112        match value {
113            UpgradeVersion::Standard => Version::V1,
114            UpgradeVersion::Lazy => Version::V1Lazy,
115        }
116    }
117}
118
119/// Builds the transport that serves as a common ground for all connections.
120#[cfg(not(target_arch = "wasm32"))]
121#[allow(unused_variables)]
122pub(crate) fn build_transport(
123    keypair: identity::Keypair,
124    relay: Option<ClientTransport>,
125    TransportConfig {
126        timeout,
127        dns_resolver,
128        version,
129        enable_quic,
130        enable_memory_transport,
131        support_quic_draft_29,
132        quic_max_idle_timeout,
133        quic_keep_alive,
134        enable_dns,
135        enable_websocket,
136        enable_secure_websocket,
137        enable_webrtc,
138        webrtc_pem,
139        websocket_pem,
140        enable_webtransport: _,
141    }: TransportConfig,
142) -> io::Result<TTransport> {
143    use crate::p2p::transport::dual_transport::SelectSecurityUpgrade;
144    use libp2p::dns::tokio::Transport as TokioDnsConfig;
145    use libp2p::quic::tokio::Transport as TokioQuicTransport;
146    use libp2p::quic::Config as QuicConfig;
147    use libp2p::tcp::{tokio::Transport as TokioTcpTransport, Config as GenTcpConfig};
148    use libp2p::tls;
149    use misc::generate_cert;
150    use rcgen::KeyPair;
151
152    let noise_config = noise::Config::new(&keypair).map_err(io::Error::other)?;
153    let tls_config = tls::Config::new(&keypair).map_err(io::Error::other)?;
154
155    //TODO: Make configurable
156    let config: SelectSecurityUpgrade<noise::Config, tls::Config> =
157        SelectSecurityUpgrade::new(noise_config, tls_config);
158
159    let yamux_config = YamuxConfig::default();
160
161    let tcp_config = GenTcpConfig::default().nodelay(true);
162
163    let transport = TokioTcpTransport::new(tcp_config.clone());
164
165    let transport = match enable_memory_transport {
166        true => {
167            let mem_ts = MemoryTransport::new();
168            Either::Left(mem_ts.or_transport(transport))
169        }
170        false => Either::Right(transport),
171    };
172
173    let transport = match enable_websocket {
174        true => {
175            let mut ws_transport =
176                libp2p::websocket::WsConfig::new(TokioTcpTransport::new(tcp_config));
177            if enable_secure_websocket {
178                let (certs, priv_key) = match websocket_pem {
179                    Some((cert, kp)) => {
180                        let mut certs = Vec::with_capacity(cert.len());
181                        let kp = KeyPair::from_pem(&kp).map_err(io::Error::other)?;
182                        let priv_key = libp2p::websocket::tls::PrivateKey::new(kp.serialize_der());
183                        for cert in cert.iter().map(|c| c.as_bytes()) {
184                            let pem = pem::parse(cert)
185                                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
186                            let cert =
187                                libp2p::websocket::tls::Certificate::new(pem.into_contents());
188                            certs.push(cert);
189                        }
190
191                        (certs, priv_key)
192                    }
193                    None => {
194                        let (cert, prv, _) = generate_cert(&keypair, b"libp2p-websocket", false)?;
195
196                        let priv_key = libp2p::websocket::tls::PrivateKey::new(prv.serialize_der());
197                        let self_cert =
198                            libp2p::websocket::tls::Certificate::new(cert.der().to_vec());
199
200                        (vec![self_cert], priv_key)
201                    }
202                };
203
204                let tls_config = libp2p::websocket::tls::Config::new(priv_key, certs)
205                    .map_err(io::Error::other)?;
206                ws_transport.set_tls_config(tls_config);
207            }
208            let transport = ws_transport.or_transport(transport);
209            Either::Left(transport)
210        }
211        false => Either::Right(transport),
212    };
213
214    let transport_timeout = TransportTimeout::new(transport, timeout);
215
216    let transport = match enable_dns {
217        true => {
218            let (cfg, opts) = dns_resolver.unwrap_or_default().into();
219            let dns_transport = TokioDnsConfig::custom(transport_timeout, cfg, opts);
220            Either::Left(dns_transport)
221        }
222        false => Either::Right(transport_timeout),
223    };
224
225    let transport = match relay {
226        Some(relay) => Either::Left(OrTransport::new(relay, transport)),
227        None => Either::Right(transport),
228    };
229
230    let transport = transport
231        .upgrade(version.into())
232        .authenticate(config)
233        .multiplex(yamux_config)
234        .timeout(timeout)
235        .boxed();
236
237    #[cfg(feature = "webrtc_transport")]
238    let transport = match enable_webrtc {
239        true => {
240            let cert = match webrtc_pem {
241                Some(pem) => libp2p_webrtc::tokio::Certificate::from_pem(&pem)
242                    .map_err(std::io::Error::other)?,
243                None => {
244                    // This flag is internal, but is meant to allow generating an expired pem to satify webrtc
245                    let expired = true;
246                    let pem = misc::generate_wrtc_cert(&keypair)?;
247
248                    libp2p_webrtc::tokio::Certificate::from_pem(&pem)
249                        .map_err(std::io::Error::other)?
250                }
251            };
252
253            let kp = keypair.clone();
254            let wrtc_tp = libp2p_webrtc::tokio::Transport::new(kp, cert);
255
256            wrtc_tp
257                .or_transport(transport)
258                .map(|either_output, _| match either_output {
259                    FutureEither::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
260                    FutureEither::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
261                })
262                .boxed()
263        }
264        false => transport.boxed(),
265    };
266
267    let transport = match enable_quic {
268        true => {
269            let mut quic_config = QuicConfig::new(&keypair);
270            quic_config.support_draft_29 = support_quic_draft_29;
271            quic_config.max_idle_timeout = quic_max_idle_timeout.as_millis() as _;
272            quic_config.keep_alive_interval = quic_keep_alive.unwrap_or(quic_max_idle_timeout / 2);
273            let quic_transport = TokioQuicTransport::new(quic_config);
274
275            OrTransport::new(quic_transport, transport)
276                .map(|either_output, _| match either_output {
277                    FutureEither::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
278                    FutureEither::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
279                })
280                .boxed()
281        }
282        false => transport,
283    };
284
285    Ok(transport)
286}
287
288#[cfg(target_arch = "wasm32")]
289pub(crate) fn build_transport(
290    keypair: identity::Keypair,
291    relay: Option<ClientTransport>,
292    TransportConfig {
293        timeout,
294        version,
295        enable_websocket,
296        enable_secure_websocket,
297        enable_webrtc,
298        enable_webtransport,
299        ..
300    }: TransportConfig,
301) -> io::Result<TTransport> {
302    use libp2p::websocket_websys;
303    use libp2p::webtransport_websys;
304
305    #[cfg(feature = "webrtc_transport")]
306    use libp2p_webrtc_websys as webrtc_websys;
307
308    let noise_config = noise::Config::new(&keypair).map_err(io::Error::other)?;
309    let yamux_config = YamuxConfig::default();
310
311    let transport = MemoryTransport::default();
312
313    let transport = match enable_websocket | enable_secure_websocket {
314        true => {
315            let ws_transport = websocket_websys::Transport::default();
316            let transport = ws_transport.or_transport(transport);
317            Either::Left(transport)
318        }
319        false => Either::Right(transport),
320    };
321
322    let transport = TransportTimeout::new(transport, timeout);
323
324    let transport = match relay {
325        Some(relay) => Either::Left(OrTransport::new(relay, transport)),
326        None => Either::Right(transport),
327    };
328
329    let transport = transport
330        .upgrade(version.into())
331        .authenticate(noise_config)
332        .multiplex(yamux_config)
333        .timeout(timeout)
334        .boxed();
335
336    let transport = match enable_webtransport {
337        true => {
338            let config = webtransport_websys::Config::new(&keypair);
339            let wtransport = webtransport_websys::Transport::new(config);
340            wtransport
341                .or_transport(transport)
342                .map(|either_output, _| match either_output {
343                    FutureEither::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
344                    FutureEither::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
345                })
346                .boxed()
347        }
348        false => transport.boxed(),
349    };
350
351    #[cfg(feature = "webrtc_transport")]
352    let transport = match enable_webrtc {
353        true => {
354            let wrtc_transport =
355                webrtc_websys::Transport::new(webrtc_websys::Config::new(&keypair));
356            wrtc_transport
357                .or_transport(transport)
358                .map(|either_output, _| match either_output {
359                    FutureEither::Left((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
360                    FutureEither::Right((peer_id, muxer)) => (peer_id, StreamMuxerBox::new(muxer)),
361                })
362                .boxed()
363        }
364        false => transport,
365    };
366
367    #[cfg(not(feature = "webrtc_transport"))]
368    {
369        _ = enable_webrtc;
370    }
371
372    Ok(transport)
373}
374
375// borrow from libp2p SwarmBuilder
376#[cfg(not(target_arch = "wasm32"))]
377mod dual_transport {
378    use either::Either;
379    use futures::{
380        future::{self, MapOk},
381        TryFutureExt,
382    };
383    use libp2p::{
384        core::{
385            either::EitherFuture,
386            upgrade::{InboundConnectionUpgrade, OutboundConnectionUpgrade},
387            UpgradeInfo,
388        },
389        PeerId,
390    };
391    use std::iter::{Chain, Map};
392
393    #[derive(Debug, Clone)]
394    pub struct SelectSecurityUpgrade<A, B>(A, B);
395
396    impl<A, B> SelectSecurityUpgrade<A, B> {
397        /// Combines two upgrades into an `SelectUpgrade`.
398        ///
399        /// The protocols supported by the first element have a higher priority.
400        pub fn new(a: A, b: B) -> Self {
401            SelectSecurityUpgrade(a, b)
402        }
403    }
404
405    impl<A, B> UpgradeInfo for SelectSecurityUpgrade<A, B>
406    where
407        A: UpgradeInfo,
408        B: UpgradeInfo,
409    {
410        type Info = Either<A::Info, B::Info>;
411        type InfoIter = Chain<
412            Map<<A::InfoIter as IntoIterator>::IntoIter, fn(A::Info) -> Self::Info>,
413            Map<<B::InfoIter as IntoIterator>::IntoIter, fn(B::Info) -> Self::Info>,
414        >;
415
416        fn protocol_info(&self) -> Self::InfoIter {
417            let a = self
418                .0
419                .protocol_info()
420                .into_iter()
421                .map(Either::Left as fn(A::Info) -> _);
422            let b = self
423                .1
424                .protocol_info()
425                .into_iter()
426                .map(Either::Right as fn(B::Info) -> _);
427
428            a.chain(b)
429        }
430    }
431
432    impl<C, A, B, TA, TB, EA, EB> InboundConnectionUpgrade<C> for SelectSecurityUpgrade<A, B>
433    where
434        A: InboundConnectionUpgrade<C, Output = (PeerId, TA), Error = EA>,
435        B: InboundConnectionUpgrade<C, Output = (PeerId, TB), Error = EB>,
436    {
437        type Output = (PeerId, future::Either<TA, TB>);
438        type Error = Either<EA, EB>;
439        type Future = MapOk<
440            EitherFuture<A::Future, B::Future>,
441            fn(future::Either<(PeerId, TA), (PeerId, TB)>) -> (PeerId, future::Either<TA, TB>),
442        >;
443
444        fn upgrade_inbound(self, sock: C, info: Self::Info) -> Self::Future {
445            match info {
446                Either::Left(info) => EitherFuture::First(self.0.upgrade_inbound(sock, info)),
447                Either::Right(info) => EitherFuture::Second(self.1.upgrade_inbound(sock, info)),
448            }
449            .map_ok(future::Either::factor_first)
450        }
451    }
452
453    impl<C, A, B, TA, TB, EA, EB> OutboundConnectionUpgrade<C> for SelectSecurityUpgrade<A, B>
454    where
455        A: OutboundConnectionUpgrade<C, Output = (PeerId, TA), Error = EA>,
456        B: OutboundConnectionUpgrade<C, Output = (PeerId, TB), Error = EB>,
457    {
458        type Output = (PeerId, future::Either<TA, TB>);
459        type Error = Either<EA, EB>;
460        type Future = MapOk<
461            EitherFuture<A::Future, B::Future>,
462            fn(future::Either<(PeerId, TA), (PeerId, TB)>) -> (PeerId, future::Either<TA, TB>),
463        >;
464
465        fn upgrade_outbound(self, sock: C, info: Self::Info) -> Self::Future {
466            match info {
467                Either::Left(info) => EitherFuture::First(self.0.upgrade_outbound(sock, info)),
468                Either::Right(info) => EitherFuture::Second(self.1.upgrade_outbound(sock, info)),
469            }
470            .map_ok(future::Either::factor_first)
471        }
472    }
473}