libp2p_tcp/
lib.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the libp2p [`libp2p_core::Transport`] trait for TCP/IP.
22//!
23//! # Usage
24//!
25//! This crate provides [`tokio::Transport`]
26//! which implement the [`libp2p_core::Transport`] trait for use as a
27//! transport with `libp2p-core` or `libp2p-swarm`.
28
29#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
30
31mod provider;
32
33use std::{
34    collections::{HashSet, VecDeque},
35    io,
36    net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, TcpListener},
37    pin::Pin,
38    sync::{Arc, RwLock},
39    task::{Context, Poll, Waker},
40    time::Duration,
41};
42
43use futures::{future::Ready, prelude::*, stream::SelectAll};
44use futures_timer::Delay;
45use if_watch::IfEvent;
46use libp2p_core::{
47    multiaddr::{Multiaddr, Protocol},
48    transport::{DialOpts, ListenerId, PortUse, TransportError, TransportEvent},
49};
50#[cfg(feature = "tokio")]
51pub use provider::tokio;
52use provider::{Incoming, Provider};
53use socket2::{Domain, Socket, Type};
54
55/// The configuration for a TCP/IP transport capability for libp2p.
56#[derive(Clone, Debug)]
57pub struct Config {
58    /// TTL to set for opened sockets, or `None` to keep default.
59    ttl: Option<u32>,
60    /// `TCP_NODELAY` to set for opened sockets.
61    nodelay: bool,
62    /// Size of the listen backlog for listen sockets.
63    backlog: u32,
64}
65
66type Port = u16;
67
68/// The configuration for port reuse of listening sockets.
69#[derive(Debug, Clone, Default)]
70struct PortReuse {
71    /// The addresses and ports of the listening sockets
72    /// registered as eligible for port reuse when dialing
73    listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>,
74}
75
76impl PortReuse {
77    /// Registers a socket address for port reuse.
78    ///
79    /// Has no effect if port reuse is disabled.
80    fn register(&mut self, ip: IpAddr, port: Port) {
81        tracing::trace!(%ip, %port, "Registering for port reuse");
82        self.listen_addrs
83            .write()
84            .expect("`register()` and `unregister()` never panic while holding the lock")
85            .insert((ip, port));
86    }
87
88    /// Unregisters a socket address for port reuse.
89    ///
90    /// Has no effect if port reuse is disabled.
91    fn unregister(&mut self, ip: IpAddr, port: Port) {
92        tracing::trace!(%ip, %port, "Unregistering for port reuse");
93        self.listen_addrs
94            .write()
95            .expect("`register()` and `unregister()` never panic while holding the lock")
96            .remove(&(ip, port));
97    }
98
99    /// Selects a listening socket address suitable for use
100    /// as the local socket address when dialing.
101    ///
102    /// If multiple listening sockets are registered for port
103    /// reuse, one is chosen whose IP protocol version and
104    /// loopback status is the same as that of `remote_ip`.
105    ///
106    /// Returns `None` if port reuse is disabled or no suitable
107    /// listening socket address is found.
108    fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
109        for (ip, port) in self
110            .listen_addrs
111            .read()
112            .expect("`local_dial_addr` never panic while holding the lock")
113            .iter()
114        {
115            if ip.is_ipv4() == remote_ip.is_ipv4() && ip.is_loopback() == remote_ip.is_loopback() {
116                if remote_ip.is_ipv4() {
117                    return Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), *port));
118                } else {
119                    return Some(SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), *port));
120                }
121            }
122        }
123
124        None
125    }
126}
127
128impl Config {
129    /// Creates a new configuration for a TCP/IP transport:
130    ///
131    ///   * Nagle's algorithm is _disabled_, i.e. `TCP_NODELAY` _enabled_. See [`Config::nodelay`].
132    ///   * Reuse of listening ports is _disabled_. See [`Config::port_reuse`].
133    ///   * No custom `IP_TTL` is set. The default of the OS TCP stack applies. See [`Config::ttl`].
134    ///   * The size of the listen backlog for new listening sockets is `1024`. See
135    ///     [`Config::listen_backlog`].
136    pub fn new() -> Self {
137        Self {
138            ttl: None,
139            nodelay: true, // Disable Nagle's algorithm by default.
140            backlog: 1024,
141        }
142    }
143
144    /// Configures the `IP_TTL` option for new sockets.
145    pub fn ttl(mut self, value: u32) -> Self {
146        self.ttl = Some(value);
147        self
148    }
149
150    /// Configures the `TCP_NODELAY` option for new sockets.
151    pub fn nodelay(mut self, value: bool) -> Self {
152        self.nodelay = value;
153        self
154    }
155
156    /// Configures the listen backlog for new listen sockets.
157    pub fn listen_backlog(mut self, backlog: u32) -> Self {
158        self.backlog = backlog;
159        self
160    }
161
162    /// Configures port reuse for local sockets, which implies
163    /// reuse of listening ports for outgoing connections to
164    /// enhance NAT traversal capabilities.
165    ///
166    /// # Deprecation Notice
167    ///
168    /// The new implementation works on a per-connaction basis, defined by the behaviour. This
169    /// removes the necessaity to configure the transport for port reuse, instead the behaviour
170    /// requiring this behaviour can decide whether to use port reuse or not.
171    ///
172    /// The API to configure port reuse is part of [`Transport`] and the option can be found in
173    /// [`libp2p_core::transport::DialOpts`].
174    ///
175    /// If [`PortUse::Reuse`] is enabled, the transport will try to reuse the local port of the
176    /// listener. If that's not possible, i.e. there is no listener or the transport doesn't allow
177    /// a direct control over ports, a new port (or the default behaviour) is used. If port reuse
178    /// is enabled for a connection, this option will be treated on a best-effor basis.
179    #[deprecated(
180        since = "0.42.0",
181        note = "This option does nothing now, since the port reuse policy is now decided on a per-connection basis by the behaviour. The function will be removed in a future release."
182    )]
183    pub fn port_reuse(self, _port_reuse: bool) -> Self {
184        self
185    }
186
187    fn create_socket(&self, socket_addr: SocketAddr, port_use: PortUse) -> io::Result<Socket> {
188        let socket = Socket::new(
189            Domain::for_address(socket_addr),
190            Type::STREAM,
191            Some(socket2::Protocol::TCP),
192        )?;
193        if socket_addr.is_ipv6() {
194            socket.set_only_v6(true)?;
195        }
196        if let Some(ttl) = self.ttl {
197            socket.set_ttl(ttl)?;
198        }
199        socket.set_nodelay(self.nodelay)?;
200        socket.set_reuse_address(true)?;
201        #[cfg(all(unix, not(any(target_os = "solaris", target_os = "illumos"))))]
202        if port_use == PortUse::Reuse {
203            socket.set_reuse_port(true)?;
204        }
205
206        #[cfg(not(all(unix, not(any(target_os = "solaris", target_os = "illumos")))))]
207        let _ = port_use; // silence the unused warning on non-unix platforms (i.e. Windows)
208
209        socket.set_nonblocking(true)?;
210
211        Ok(socket)
212    }
213}
214
215impl Default for Config {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221/// An abstract [`libp2p_core::Transport`] implementation.
222///
223/// You shouldn't need to use this type directly. Use one of the following instead:
224///
225/// - [`tokio::Transport`]
226pub struct Transport<T>
227where
228    T: Provider + Send,
229{
230    config: Config,
231
232    /// The configuration of port reuse when dialing.
233    port_reuse: PortReuse,
234    /// All the active listeners.
235    /// The [`ListenStream`] struct contains a stream that we want to be pinned. Since the
236    /// `VecDeque` can be resized, the only way is to use a `Pin<Box<>>`.
237    listeners: SelectAll<ListenStream<T>>,
238    /// Pending transport events to return from [`libp2p_core::Transport::poll`].
239    pending_events:
240        VecDeque<TransportEvent<<Self as libp2p_core::Transport>::ListenerUpgrade, io::Error>>,
241}
242
243impl<T> Transport<T>
244where
245    T: Provider + Send,
246{
247    /// Create a new instance of [`Transport`].
248    ///
249    /// If you don't want to specify a [`Config`], use [`Transport::default`].
250    ///
251    /// It is best to call this function through one of the type-aliases of this type:
252    ///
253    /// - [`tokio::Transport::new`]
254    pub fn new(config: Config) -> Self {
255        Transport {
256            config,
257            ..Default::default()
258        }
259    }
260
261    fn do_listen(
262        &mut self,
263        id: ListenerId,
264        socket_addr: SocketAddr,
265    ) -> io::Result<ListenStream<T>> {
266        let socket = self.config.create_socket(socket_addr, PortUse::Reuse)?;
267        socket.bind(&socket_addr.into())?;
268        socket.listen(self.config.backlog as _)?;
269        socket.set_nonblocking(true)?;
270        let listener: TcpListener = socket.into();
271        let local_addr = listener.local_addr()?;
272
273        if local_addr.ip().is_unspecified() {
274            return ListenStream::<T>::new(
275                id,
276                listener,
277                Some(T::new_if_watcher()?),
278                self.port_reuse.clone(),
279            );
280        }
281
282        self.port_reuse.register(local_addr.ip(), local_addr.port());
283        let listen_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
284        self.pending_events.push_back(TransportEvent::NewAddress {
285            listener_id: id,
286            listen_addr,
287        });
288        ListenStream::<T>::new(id, listener, None, self.port_reuse.clone())
289    }
290}
291
292impl<T> Default for Transport<T>
293where
294    T: Provider + Send,
295{
296    /// Creates a [`Transport`] with reasonable defaults.
297    ///
298    /// This transport will have port-reuse disabled.
299    fn default() -> Self {
300        Transport {
301            port_reuse: PortReuse::default(),
302            config: Config::default(),
303            listeners: SelectAll::new(),
304            pending_events: VecDeque::new(),
305        }
306    }
307}
308
309impl<T> libp2p_core::Transport for Transport<T>
310where
311    T: Provider + Send + 'static,
312    T::Listener: Unpin,
313    T::Stream: Unpin,
314{
315    type Output = T::Stream;
316    type Error = io::Error;
317    type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
318    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
319
320    fn listen_on(
321        &mut self,
322        id: ListenerId,
323        addr: Multiaddr,
324    ) -> Result<(), TransportError<Self::Error>> {
325        let socket_addr = multiaddr_to_socketaddr(addr.clone())
326            .map_err(|_| TransportError::MultiaddrNotSupported(addr))?;
327        tracing::debug!("listening on {}", socket_addr);
328        let listener = self
329            .do_listen(id, socket_addr)
330            .map_err(TransportError::Other)?;
331        self.listeners.push(listener);
332        Ok(())
333    }
334
335    fn remove_listener(&mut self, id: ListenerId) -> bool {
336        if let Some(listener) = self.listeners.iter_mut().find(|l| l.listener_id == id) {
337            listener.close(Ok(()));
338            true
339        } else {
340            false
341        }
342    }
343
344    fn dial(
345        &mut self,
346        addr: Multiaddr,
347        opts: DialOpts,
348    ) -> Result<Self::Dial, TransportError<Self::Error>> {
349        let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(addr.clone()) {
350            if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
351                return Err(TransportError::MultiaddrNotSupported(addr));
352            }
353            socket_addr
354        } else {
355            return Err(TransportError::MultiaddrNotSupported(addr));
356        };
357        tracing::debug!(address=%socket_addr, "dialing address");
358
359        let socket = self
360            .config
361            .create_socket(socket_addr, opts.port_use)
362            .map_err(TransportError::Other)?;
363
364        let bind_addr = match self.port_reuse.local_dial_addr(&socket_addr.ip()) {
365            Some(socket_addr) if opts.port_use == PortUse::Reuse => {
366                tracing::trace!(address=%addr, "Binding dial socket to listen socket address");
367                Some(socket_addr)
368            }
369            _ => None,
370        };
371
372        let local_config = self.config.clone();
373
374        Ok(async move {
375            if let Some(bind_addr) = bind_addr {
376                socket.bind(&bind_addr.into())?;
377            }
378
379            // [`Transport::dial`] should do no work unless the returned [`Future`] is polled. Thus
380            // do the `connect` call within the [`Future`].
381            let socket = match (socket.connect(&socket_addr.into()), bind_addr) {
382                (Ok(()), _) => socket,
383                (Err(err), _) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
384                (Err(err), _) if err.kind() == io::ErrorKind::WouldBlock => socket,
385                (Err(err), Some(bind_addr)) if err.kind() == io::ErrorKind::AddrNotAvailable  => {
386                    // The socket was bound to a local address that is no longer available.
387                    // Retry without binding.
388                    tracing::debug!(connect_addr = %socket_addr, ?bind_addr, "Failed to connect using existing socket because we already have a connection, re-dialing with new port");
389                    std::mem::drop(socket);
390                    let socket = local_config.create_socket(socket_addr, PortUse::New)?;
391                    match socket.connect(&socket_addr.into()) {
392                        Ok(()) => socket,
393                        Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => socket,
394                        Err(err) if err.kind() == io::ErrorKind::WouldBlock => socket,
395                        Err(err) => return Err(err),
396                    }
397                }
398                (Err(err), _) => return Err(err),
399            };
400
401            let stream = T::new_stream(socket.into()).await?;
402            Ok(stream)
403        }
404        .boxed())
405    }
406
407    /// Poll all listeners.
408    #[tracing::instrument(level = "trace", name = "Transport::poll", skip(self, cx))]
409    fn poll(
410        mut self: Pin<&mut Self>,
411        cx: &mut Context<'_>,
412    ) -> Poll<TransportEvent<Self::ListenerUpgrade, Self::Error>> {
413        // Return pending events from closed listeners.
414        if let Some(event) = self.pending_events.pop_front() {
415            return Poll::Ready(event);
416        }
417
418        match self.listeners.poll_next_unpin(cx) {
419            Poll::Ready(Some(transport_event)) => Poll::Ready(transport_event),
420            _ => Poll::Pending,
421        }
422    }
423}
424
425/// A stream of incoming connections on one or more interfaces.
426struct ListenStream<T>
427where
428    T: Provider,
429{
430    /// The ID of this listener.
431    listener_id: ListenerId,
432    /// The socket address that the listening socket is bound to,
433    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
434    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
435    listen_addr: SocketAddr,
436    /// The async listening socket for incoming connections.
437    listener: T::Listener,
438    /// Watcher for network interface changes.
439    /// Reports [`IfEvent`]s for new / deleted ip-addresses when interfaces
440    /// become or stop being available.
441    ///
442    /// `None` if the socket is only listening on a single interface.
443    if_watcher: Option<T::IfWatcher>,
444    /// The port reuse configuration for outgoing connections.
445    ///
446    /// If enabled, all IP addresses on which this listening stream
447    /// is accepting connections (`in_addr`) are registered for reuse
448    /// as local addresses for the sockets of outgoing connections. They are
449    /// unregistered when the stream encounters an error or is dropped.
450    port_reuse: PortReuse,
451    /// How long to sleep after a (non-fatal) error while trying
452    /// to accept a new connection.
453    sleep_on_error: Duration,
454    /// The current pause, if any.
455    pause: Option<Delay>,
456    /// Pending event to reported.
457    pending_event: Option<<Self as Stream>::Item>,
458    /// The listener can be manually closed with
459    /// [`Transport::remove_listener`](libp2p_core::Transport::remove_listener).
460    is_closed: bool,
461    /// The stream must be awaken after it has been closed to deliver the last event.
462    close_listener_waker: Option<Waker>,
463}
464
465impl<T> ListenStream<T>
466where
467    T: Provider,
468{
469    /// Constructs a [`ListenStream`] for incoming connections around
470    /// the given [`TcpListener`].
471    fn new(
472        listener_id: ListenerId,
473        listener: TcpListener,
474        if_watcher: Option<T::IfWatcher>,
475        port_reuse: PortReuse,
476    ) -> io::Result<Self> {
477        let listen_addr = listener.local_addr()?;
478        let listener = T::new_listener(listener)?;
479
480        Ok(ListenStream {
481            port_reuse,
482            listener,
483            listener_id,
484            listen_addr,
485            if_watcher,
486            pause: None,
487            sleep_on_error: Duration::from_millis(100),
488            pending_event: None,
489            is_closed: false,
490            close_listener_waker: None,
491        })
492    }
493
494    /// Disables port reuse for any listen address of this stream.
495    ///
496    /// This is done when the [`ListenStream`] encounters a fatal
497    /// error (for the stream) or is dropped.
498    ///
499    /// Has no effect if port reuse is disabled.
500    fn disable_port_reuse(&mut self) {
501        match &self.if_watcher {
502            Some(if_watcher) => {
503                for ip_net in T::addrs(if_watcher) {
504                    self.port_reuse
505                        .unregister(ip_net.addr(), self.listen_addr.port());
506                }
507            }
508            None => self
509                .port_reuse
510                .unregister(self.listen_addr.ip(), self.listen_addr.port()),
511        }
512    }
513
514    /// Close the listener.
515    ///
516    /// This will create a [`TransportEvent::ListenerClosed`] and
517    /// terminate the stream once the event has been reported.
518    fn close(&mut self, reason: Result<(), io::Error>) {
519        if self.is_closed {
520            return;
521        }
522        self.pending_event = Some(TransportEvent::ListenerClosed {
523            listener_id: self.listener_id,
524            reason,
525        });
526        self.is_closed = true;
527
528        // Wake the stream to deliver the last event.
529        if let Some(waker) = self.close_listener_waker.take() {
530            waker.wake();
531        }
532    }
533
534    /// Poll for a next If Event.
535    fn poll_if_addr(&mut self, cx: &mut Context<'_>) -> Poll<<Self as Stream>::Item> {
536        let Some(if_watcher) = self.if_watcher.as_mut() else {
537            return Poll::Pending;
538        };
539
540        let my_listen_addr_port = self.listen_addr.port();
541
542        while let Poll::Ready(Some(event)) = if_watcher.poll_next_unpin(cx) {
543            match event {
544                Ok(IfEvent::Up(inet)) => {
545                    let ip = inet.addr();
546                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
547                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
548                        tracing::debug!(address=%ma, "New listen address");
549                        self.port_reuse.register(ip, my_listen_addr_port);
550                        return Poll::Ready(TransportEvent::NewAddress {
551                            listener_id: self.listener_id,
552                            listen_addr: ma,
553                        });
554                    }
555                }
556                Ok(IfEvent::Down(inet)) => {
557                    let ip = inet.addr();
558                    if self.listen_addr.is_ipv4() == ip.is_ipv4() {
559                        let ma = ip_to_multiaddr(ip, my_listen_addr_port);
560                        tracing::debug!(address=%ma, "Expired listen address");
561                        self.port_reuse.unregister(ip, my_listen_addr_port);
562                        return Poll::Ready(TransportEvent::AddressExpired {
563                            listener_id: self.listener_id,
564                            listen_addr: ma,
565                        });
566                    }
567                }
568                Err(error) => {
569                    self.pause = Some(Delay::new(self.sleep_on_error));
570                    return Poll::Ready(TransportEvent::ListenerError {
571                        listener_id: self.listener_id,
572                        error,
573                    });
574                }
575            }
576        }
577
578        Poll::Pending
579    }
580}
581
582impl<T> Drop for ListenStream<T>
583where
584    T: Provider,
585{
586    fn drop(&mut self) {
587        self.disable_port_reuse();
588    }
589}
590
591impl<T> Stream for ListenStream<T>
592where
593    T: Provider,
594    T::Listener: Unpin,
595    T::Stream: Unpin,
596{
597    type Item = TransportEvent<Ready<Result<T::Stream, io::Error>>, io::Error>;
598
599    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
600        if let Some(mut pause) = self.pause.take() {
601            match pause.poll_unpin(cx) {
602                Poll::Ready(_) => {}
603                Poll::Pending => {
604                    self.pause = Some(pause);
605                    return Poll::Pending;
606                }
607            }
608        }
609
610        if let Some(event) = self.pending_event.take() {
611            return Poll::Ready(Some(event));
612        }
613
614        if self.is_closed {
615            // Terminate the stream if the listener closed
616            // and all remaining events have been reported.
617            return Poll::Ready(None);
618        }
619
620        if let Poll::Ready(event) = self.poll_if_addr(cx) {
621            return Poll::Ready(Some(event));
622        }
623
624        // Take the pending connection from the backlog.
625        match T::poll_accept(&mut self.listener, cx) {
626            Poll::Ready(Ok(Incoming {
627                local_addr,
628                remote_addr,
629                stream,
630            })) => {
631                let local_addr = ip_to_multiaddr(local_addr.ip(), local_addr.port());
632                let remote_addr = ip_to_multiaddr(remote_addr.ip(), remote_addr.port());
633
634                tracing::debug!(
635                    remote_address=%remote_addr,
636                    local_address=%local_addr,
637                    "Incoming connection from remote at local"
638                );
639
640                return Poll::Ready(Some(TransportEvent::Incoming {
641                    listener_id: self.listener_id,
642                    upgrade: future::ok(stream),
643                    local_addr,
644                    send_back_addr: remote_addr,
645                }));
646            }
647            Poll::Ready(Err(error)) => {
648                // These errors are non-fatal for the listener stream.
649                self.pause = Some(Delay::new(self.sleep_on_error));
650                return Poll::Ready(Some(TransportEvent::ListenerError {
651                    listener_id: self.listener_id,
652                    error,
653                }));
654            }
655            Poll::Pending => {}
656        }
657
658        self.close_listener_waker = Some(cx.waker().clone());
659        Poll::Pending
660    }
661}
662
663/// Extracts a `SocketAddr` from a given `Multiaddr`.
664///
665/// Fails if the given `Multiaddr` does not begin with an IP
666/// protocol encapsulating a TCP port.
667fn multiaddr_to_socketaddr(mut addr: Multiaddr) -> Result<SocketAddr, ()> {
668    // "Pop" the IP address and TCP port from the end of the address,
669    // ignoring a `/p2p/...` suffix as well as any prefix of possibly
670    // outer protocols, if present.
671    let mut port = None;
672    while let Some(proto) = addr.pop() {
673        match proto {
674            Protocol::Ip4(ipv4) => match port {
675                Some(port) => return Ok(SocketAddr::new(ipv4.into(), port)),
676                None => return Err(()),
677            },
678            Protocol::Ip6(ipv6) => match port {
679                Some(port) => return Ok(SocketAddr::new(ipv6.into(), port)),
680                None => return Err(()),
681            },
682            Protocol::Tcp(portnum) => match port {
683                Some(_) => return Err(()),
684                None => port = Some(portnum),
685            },
686            Protocol::P2p(_) => {}
687            _ => return Err(()),
688        }
689    }
690    Err(())
691}
692
693// Create a [`Multiaddr`] from the given IP address and port number.
694fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
695    Multiaddr::empty().with(ip.into()).with(Protocol::Tcp(port))
696}
697
698#[cfg(all(test, feature = "tokio"))]
699mod tests {
700    use futures::{
701        channel::{mpsc, oneshot},
702        future::poll_fn,
703    };
704    use libp2p_core::{Endpoint, Transport as _};
705
706    use super::*;
707
708    #[test]
709    fn multiaddr_to_tcp_conversion() {
710        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
711
712        assert!(
713            multiaddr_to_socketaddr("/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
714                .is_err()
715        );
716
717        assert_eq!(
718            multiaddr_to_socketaddr("/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
719            Ok(SocketAddr::new(
720                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
721                12345,
722            ))
723        );
724        assert_eq!(
725            multiaddr_to_socketaddr(
726                "/ip4/255.255.255.255/tcp/8080"
727                    .parse::<Multiaddr>()
728                    .unwrap()
729            ),
730            Ok(SocketAddr::new(
731                IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
732                8080,
733            ))
734        );
735        assert_eq!(
736            multiaddr_to_socketaddr("/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
737            Ok(SocketAddr::new(
738                IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
739                12345,
740            ))
741        );
742        assert_eq!(
743            multiaddr_to_socketaddr(
744                "/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
745                    .parse::<Multiaddr>()
746                    .unwrap()
747            ),
748            Ok(SocketAddr::new(
749                IpAddr::V6(Ipv6Addr::new(
750                    65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
751                )),
752                8080,
753            ))
754        );
755    }
756
757    #[test]
758    fn communicating_between_dialer_and_listener() {
759        let _ = tracing_subscriber::fmt()
760            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
761            .try_init();
762
763        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
764            let mut tcp = Transport::<T>::default().boxed();
765            tcp.listen_on(ListenerId::next(), addr).unwrap();
766            loop {
767                match tcp.select_next_some().await {
768                    TransportEvent::NewAddress { listen_addr, .. } => {
769                        ready_tx.send(listen_addr).await.unwrap();
770                    }
771                    TransportEvent::Incoming { upgrade, .. } => {
772                        let mut upgrade = upgrade.await.unwrap();
773                        let mut buf = [0u8; 3];
774                        upgrade.read_exact(&mut buf).await.unwrap();
775                        assert_eq!(buf, [1, 2, 3]);
776                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
777                        return;
778                    }
779                    e => panic!("Unexpected transport event: {e:?}"),
780                }
781            }
782        }
783
784        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
785            let addr = ready_rx.next().await.unwrap();
786            let mut tcp = Transport::<T>::default();
787
788            // Obtain a future socket through dialing
789            let mut socket = tcp
790                .dial(
791                    addr.clone(),
792                    DialOpts {
793                        role: Endpoint::Dialer,
794                        port_use: PortUse::Reuse,
795                    },
796                )
797                .unwrap()
798                .await
799                .unwrap();
800            socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
801
802            let mut buf = [0u8; 3];
803            socket.read_exact(&mut buf).await.unwrap();
804            assert_eq!(buf, [4, 5, 6]);
805        }
806
807        fn test(addr: Multiaddr) {
808            let (ready_tx, ready_rx) = mpsc::channel(1);
809            let listener = listener::<tokio::Tcp>(addr, ready_tx);
810            let dialer = dialer::<tokio::Tcp>(ready_rx);
811            let rt = ::tokio::runtime::Builder::new_current_thread()
812                .enable_io()
813                .build()
814                .unwrap();
815            let tasks = ::tokio::task::LocalSet::new();
816            let listener = tasks.spawn_local(listener);
817            tasks.block_on(&rt, dialer);
818            tasks.block_on(&rt, listener).unwrap();
819        }
820
821        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
822        test("/ip6/::1/tcp/0".parse().unwrap());
823    }
824
825    #[test]
826    fn wildcard_expansion() {
827        let _ = tracing_subscriber::fmt()
828            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
829            .try_init();
830
831        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
832            let mut tcp = Transport::<T>::default().boxed();
833            tcp.listen_on(ListenerId::next(), addr).unwrap();
834
835            loop {
836                match tcp.select_next_some().await {
837                    TransportEvent::NewAddress { listen_addr, .. } => {
838                        let mut iter = listen_addr.iter();
839                        match iter.next().expect("ip address") {
840                            Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
841                            Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
842                            other => panic!("Unexpected protocol: {other}"),
843                        }
844                        if let Protocol::Tcp(port) = iter.next().expect("port") {
845                            assert_ne!(0, port)
846                        } else {
847                            panic!("No TCP port in address: {listen_addr}")
848                        }
849                        ready_tx.send(listen_addr).await.ok();
850                    }
851                    TransportEvent::Incoming { .. } => {
852                        return;
853                    }
854                    _ => {}
855                }
856            }
857        }
858
859        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
860            let dest_addr = ready_rx.next().await.unwrap();
861            let mut tcp = Transport::<T>::default();
862            tcp.dial(
863                dest_addr,
864                DialOpts {
865                    role: Endpoint::Dialer,
866                    port_use: PortUse::New,
867                },
868            )
869            .unwrap()
870            .await
871            .unwrap();
872        }
873
874        fn test(addr: Multiaddr) {
875            let (ready_tx, ready_rx) = mpsc::channel(1);
876            let listener = listener::<tokio::Tcp>(addr, ready_tx);
877            let dialer = dialer::<tokio::Tcp>(ready_rx);
878            let rt = ::tokio::runtime::Builder::new_current_thread()
879                .enable_io()
880                .build()
881                .unwrap();
882            let tasks = ::tokio::task::LocalSet::new();
883            let listener = tasks.spawn_local(listener);
884            tasks.block_on(&rt, dialer);
885            tasks.block_on(&rt, listener).unwrap();
886        }
887
888        test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
889        test("/ip6/::1/tcp/0".parse().unwrap());
890    }
891
892    #[test]
893    fn port_reuse_dialing() {
894        let _ = tracing_subscriber::fmt()
895            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
896            .try_init();
897
898        async fn listener<T: Provider>(
899            addr: Multiaddr,
900            mut ready_tx: mpsc::Sender<Multiaddr>,
901            port_reuse_rx: oneshot::Receiver<Protocol<'_>>,
902        ) {
903            let mut tcp = Transport::<T>::new(Config::new()).boxed();
904            tcp.listen_on(ListenerId::next(), addr).unwrap();
905            loop {
906                match tcp.select_next_some().await {
907                    TransportEvent::NewAddress { listen_addr, .. } => {
908                        ready_tx.send(listen_addr).await.ok();
909                    }
910                    TransportEvent::Incoming {
911                        upgrade,
912                        mut send_back_addr,
913                        ..
914                    } => {
915                        // Receive the dialer tcp port reuse
916                        let remote_port_reuse = port_reuse_rx.await.unwrap();
917                        // And check it is the same as the remote port used for upgrade
918                        assert_eq!(send_back_addr.pop().unwrap(), remote_port_reuse);
919
920                        let mut upgrade = upgrade.await.unwrap();
921                        let mut buf = [0u8; 3];
922                        upgrade.read_exact(&mut buf).await.unwrap();
923                        assert_eq!(buf, [1, 2, 3]);
924                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
925                        return;
926                    }
927                    e => panic!("Unexpected event: {e:?}"),
928                }
929            }
930        }
931
932        async fn dialer<T: Provider>(
933            addr: Multiaddr,
934            mut ready_rx: mpsc::Receiver<Multiaddr>,
935            port_reuse_tx: oneshot::Sender<Protocol<'_>>,
936        ) {
937            let dest_addr = ready_rx.next().await.unwrap();
938            let mut tcp = Transport::<T>::new(Config::new());
939            tcp.listen_on(ListenerId::next(), addr).unwrap();
940            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
941                TransportEvent::NewAddress { .. } => {
942                    // Check that tcp and listener share the same port reuse SocketAddr
943                    let listener = tcp.listeners.iter().next().unwrap();
944                    let port_reuse_tcp = tcp.port_reuse.local_dial_addr(&listener.listen_addr.ip());
945                    let port_reuse_listener = listener
946                        .port_reuse
947                        .local_dial_addr(&listener.listen_addr.ip());
948                    assert!(port_reuse_tcp.is_some());
949                    assert_eq!(port_reuse_tcp, port_reuse_listener);
950
951                    // Send the dialer tcp port reuse to the listener
952                    port_reuse_tx
953                        .send(Protocol::Tcp(port_reuse_tcp.unwrap().port()))
954                        .ok();
955
956                    // Obtain a future socket through dialing
957                    let mut socket = tcp
958                        .dial(
959                            dest_addr,
960                            DialOpts {
961                                role: Endpoint::Dialer,
962                                port_use: PortUse::Reuse,
963                            },
964                        )
965                        .unwrap()
966                        .await
967                        .unwrap();
968                    socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
969                    // socket.flush().await;
970                    let mut buf = [0u8; 3];
971                    socket.read_exact(&mut buf).await.unwrap();
972                    assert_eq!(buf, [4, 5, 6]);
973                }
974                e => panic!("Unexpected transport event: {e:?}"),
975            }
976        }
977
978        fn test(addr: Multiaddr) {
979            let (ready_tx, ready_rx) = mpsc::channel(1);
980            let (port_reuse_tx, port_reuse_rx) = oneshot::channel();
981            let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx, port_reuse_rx);
982            let dialer = dialer::<tokio::Tcp>(addr, ready_rx, port_reuse_tx);
983            let rt = ::tokio::runtime::Builder::new_current_thread()
984                .enable_io()
985                .build()
986                .unwrap();
987            let tasks = ::tokio::task::LocalSet::new();
988            let listener = tasks.spawn_local(listener);
989            tasks.block_on(&rt, dialer);
990            tasks.block_on(&rt, listener).unwrap();
991        }
992
993        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
994        test("/ip6/::1/tcp/0".parse().unwrap());
995    }
996
997    #[test]
998    fn port_reuse_listening() {
999        let _ = tracing_subscriber::fmt()
1000            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1001            .try_init();
1002
1003        async fn listen_twice<T: Provider>(addr: Multiaddr) {
1004            let mut tcp = Transport::<T>::new(Config::new());
1005            tcp.listen_on(ListenerId::next(), addr).unwrap();
1006            match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1007                TransportEvent::NewAddress {
1008                    listen_addr: addr1, ..
1009                } => {
1010                    let listener1 = tcp.listeners.iter().next().unwrap();
1011                    let port_reuse_tcp =
1012                        tcp.port_reuse.local_dial_addr(&listener1.listen_addr.ip());
1013                    let port_reuse_listener1 = listener1
1014                        .port_reuse
1015                        .local_dial_addr(&listener1.listen_addr.ip());
1016                    assert!(port_reuse_tcp.is_some());
1017                    assert_eq!(port_reuse_tcp, port_reuse_listener1);
1018
1019                    // Listen on the same address a second time.
1020                    tcp.listen_on(ListenerId::next(), addr1.clone()).unwrap();
1021                    match poll_fn(|cx| Pin::new(&mut tcp).poll(cx)).await {
1022                        TransportEvent::NewAddress {
1023                            listen_addr: addr2, ..
1024                        } => assert_eq!(addr1, addr2),
1025                        e => panic!("Unexpected transport event: {e:?}"),
1026                    }
1027                }
1028                e => panic!("Unexpected transport event: {e:?}"),
1029            }
1030        }
1031
1032        fn test(addr: Multiaddr) {
1033            let listener = listen_twice::<tokio::Tcp>(addr);
1034            let rt = ::tokio::runtime::Builder::new_current_thread()
1035                .enable_io()
1036                .build()
1037                .unwrap();
1038            rt.block_on(listener);
1039        }
1040
1041        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1042    }
1043
1044    #[test]
1045    fn listen_port_0() {
1046        let _ = tracing_subscriber::fmt()
1047            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1048            .try_init();
1049
1050        async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
1051            let mut tcp = Transport::<T>::default().boxed();
1052            tcp.listen_on(ListenerId::next(), addr).unwrap();
1053            tcp.select_next_some()
1054                .await
1055                .into_new_address()
1056                .expect("listen address")
1057        }
1058
1059        fn test(addr: Multiaddr) {
1060            let rt = ::tokio::runtime::Builder::new_current_thread()
1061                .enable_io()
1062                .build()
1063                .unwrap();
1064            let new_addr = rt.block_on(listen::<tokio::Tcp>(addr));
1065            assert!(!new_addr.to_string().contains("tcp/0"));
1066        }
1067
1068        test("/ip6/::1/tcp/0".parse().unwrap());
1069        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1070    }
1071
1072    #[test]
1073    fn listen_invalid_addr() {
1074        let _ = tracing_subscriber::fmt()
1075            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1076            .try_init();
1077
1078        fn test(addr: Multiaddr) {
1079            let mut tcp = tokio::Transport::default();
1080            assert!(tcp.listen_on(ListenerId::next(), addr).is_err());
1081        }
1082
1083        test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1084    }
1085
1086    #[test]
1087    fn test_remove_listener() {
1088        let _ = tracing_subscriber::fmt()
1089            .with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
1090            .try_init();
1091
1092        async fn cycle_listeners<T: Provider>() -> bool {
1093            let mut tcp = Transport::<T>::default().boxed();
1094            let listener_id = ListenerId::next();
1095            tcp.listen_on(listener_id, "/ip4/127.0.0.1/tcp/0".parse().unwrap())
1096                .unwrap();
1097            tcp.remove_listener(listener_id)
1098        }
1099
1100        #[cfg(feature = "tokio")]
1101        {
1102            let rt = ::tokio::runtime::Builder::new_current_thread()
1103                .enable_io()
1104                .build()
1105                .unwrap();
1106            assert!(rt.block_on(cycle_listeners::<tokio::Tcp>()));
1107        }
1108    }
1109
1110    #[test]
1111    fn test_listens_ipv4_ipv6_separately() {
1112        fn test<T: Provider>() {
1113            let port = {
1114                let listener = TcpListener::bind("127.0.0.1:0").unwrap();
1115                listener.local_addr().unwrap().port()
1116            };
1117            let mut tcp = Transport::<T>::default().boxed();
1118            let listener_id = ListenerId::next();
1119            tcp.listen_on(
1120                listener_id,
1121                format!("/ip4/0.0.0.0/tcp/{port}").parse().unwrap(),
1122            )
1123            .unwrap();
1124            tcp.listen_on(
1125                ListenerId::next(),
1126                format!("/ip6/::/tcp/{port}").parse().unwrap(),
1127            )
1128            .unwrap();
1129        }
1130        #[cfg(feature = "tokio")]
1131        {
1132            let rt = ::tokio::runtime::Builder::new_current_thread()
1133                .enable_io()
1134                .build()
1135                .unwrap();
1136            rt.block_on(async {
1137                test::<tokio::Tcp>();
1138            });
1139        }
1140    }
1141}