tet_libp2p_tcp/
lib.rs

1// Copyright 2017 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Implementation of the libp2p `Transport` trait for TCP/IP.
22//!
23//! # Usage
24//!
25//! This crate provides a `TcpConfig` and `TokioTcpConfig`, depending on
26//! the enabled features, which implement the `Transport` trait for use as a
27//! transport with `tet-libp2p-core` or `tet-libp2p-swarm`.
28
29mod provider;
30
31#[cfg(feature = "async-io")]
32pub use provider::async_io;
33
34/// The type of a [`GenTcpConfig`] using the `async-io` implementation.
35#[cfg(feature = "async-io")]
36pub type TcpConfig = GenTcpConfig<async_io::Tcp>;
37
38#[cfg(feature = "tokio")]
39pub use provider::tokio;
40
41/// The type of a [`GenTcpConfig`] using the `tokio` implementation.
42#[cfg(feature = "tokio")]
43pub type TokioTcpConfig = GenTcpConfig<tokio::Tcp>;
44
45use futures::{
46    future::{self, BoxFuture, Ready},
47    prelude::*,
48    ready,
49};
50use futures_timer::Delay;
51use tet_libp2p_core::{
52    address_translation,
53    multiaddr::{Multiaddr, Protocol},
54    transport::{ListenerEvent, Transport, TransportError},
55};
56use socket2::{Domain, Socket, Type};
57use std::{
58    collections::HashSet,
59    io,
60    net::{SocketAddr, IpAddr, TcpListener},
61    pin::Pin,
62    sync::{Arc, RwLock},
63    task::{Context, Poll},
64    time::Duration,
65};
66
67use provider::{Provider, IfEvent};
68
69/// The configuration for a TCP/IP transport capability for libp2p.
70///
71/// A [`GenTcpConfig`] implements the [`Transport`] interface and thus
72/// is consumed on [`Transport::listen_on`] and [`Transport::dial`].
73/// However, the config can be cheaply cloned to perform multiple such
74/// operations with the same config.
75#[derive(Clone, Debug)]
76pub struct GenTcpConfig<T> {
77    /// The type of the I/O provider.
78    _impl: std::marker::PhantomData<T>,
79    /// TTL to set for opened sockets, or `None` to keep default.
80    ttl: Option<u32>,
81    /// `TCP_NODELAY` to set for opened sockets, or `None` to keep default.
82    nodelay: Option<bool>,
83    /// Size of the listen backlog for listen sockets.
84    backlog: u32,
85    /// The configuration of port reuse when dialing.
86    port_reuse: PortReuse,
87}
88
89type Port = u16;
90
91/// The configuration for port reuse of listening sockets.
92#[derive(Debug, Clone)]
93enum PortReuse {
94    /// Port reuse is disabled, i.e. ephemeral local ports are
95    /// used for outgoing TCP connections.
96    Disabled,
97    /// Port reuse when dialing is enabled, i.e. the local
98    /// address and port that a new socket for an outgoing
99    /// connection is bound to are chosen from an existing
100    /// listening socket, if available.
101    Enabled {
102        /// The addresses and ports of the listening sockets
103        /// registered as eligible for port reuse when dialing.
104        listen_addrs: Arc<RwLock<HashSet<(IpAddr, Port)>>>
105    },
106}
107
108impl PortReuse {
109    /// Registers a socket address for port reuse.
110    ///
111    /// Has no effect if port reuse is disabled.
112    fn register(&mut self, ip: IpAddr, port: Port) {
113        if let PortReuse::Enabled { listen_addrs } = self {
114            log::trace!("Registering for port reuse: {}:{}", ip, port);
115            listen_addrs
116                .write()
117                .expect("`register()` and `unregister()` never panic while holding the lock")
118                .insert((ip, port));
119        }
120    }
121
122    /// Unregisters a socket address for port reuse.
123    ///
124    /// Has no effect if port reuse is disabled.
125    fn unregister(&mut self, ip: IpAddr, port: Port) {
126        if let PortReuse::Enabled { listen_addrs } = self {
127            log::trace!("Unregistering for port reuse: {}:{}", ip, port);
128            listen_addrs
129                .write()
130                .expect("`register()` and `unregister()` never panic while holding the lock")
131                .remove(&(ip, port));
132        }
133    }
134
135    /// Selects a listening socket address suitable for use
136    /// as the local socket address when dialing.
137    ///
138    /// If multiple listening sockets are registered for port
139    /// reuse, one is chosen whose IP protocol version and
140    /// loopback status is the same as that of `remote_ip`.
141    ///
142    /// Returns `None` if port reuse is disabled or no suitable
143    /// listening socket address is found.
144    fn local_dial_addr(&self, remote_ip: &IpAddr) -> Option<SocketAddr> {
145        if let PortReuse::Enabled { listen_addrs } = self {
146            for (ip, port) in listen_addrs
147                .read()
148                .expect("`register()` and `unregister()` never panic while holding the lock")
149                .iter()
150            {
151                if ip.is_ipv4() == remote_ip.is_ipv4()
152                    && ip.is_loopback() == remote_ip.is_loopback()
153                {
154                     return Some(SocketAddr::new(*ip, *port))
155                }
156            }
157        }
158
159        None
160    }
161}
162
163impl<T> GenTcpConfig<T>
164where
165    T: Provider + Send,
166{
167    /// Creates a new configuration for a TCP/IP transport:
168    ///
169    ///   * Nagle's algorithm, i.e. `TCP_NODELAY`, is _enabled_.
170    ///     See [`GenTcpConfig::nodelay`].
171    ///   * Reuse of listening ports is _disabled_.
172    ///     See [`GenTcpConfig::port_reuse`].
173    ///   * No custom `IP_TTL` is set. The default of the OS TCP stack applies.
174    ///     See [`GenTcpConfig::ttl`].
175    ///   * The size of the listen backlog for new listening sockets is `1024`.
176    ///     See [`GenTcpConfig::listen_backlog`].
177    pub fn new() -> Self {
178        Self {
179            ttl: None,
180            nodelay: None,
181            backlog: 1024,
182            port_reuse: PortReuse::Disabled,
183            _impl: std::marker::PhantomData,
184        }
185    }
186
187    /// Configures the `IP_TTL` option for new sockets.
188    pub fn ttl(mut self, value: u32) -> Self {
189        self.ttl = Some(value);
190        self
191    }
192
193    /// Configures the `TCP_NODELAY` option for new sockets.
194    pub fn nodelay(mut self, value: bool) -> Self {
195        self.nodelay = Some(value);
196        self
197    }
198
199    /// Configures the listen backlog for new listen sockets.
200    pub fn listen_backlog(mut self, backlog: u32) -> Self {
201        self.backlog = backlog;
202        self
203    }
204
205    /// Configures port reuse for local sockets, which implies
206    /// reuse of listening ports for outgoing connections to
207    /// enhance NAT traversal capabilities.
208    ///
209    /// Please refer to e.g. [RFC 4787](https://tools.ietf.org/html/rfc4787)
210    /// section 4 and 5 for some of the NAT terminology used here.
211    ///
212    /// There are two main use-cases for port reuse among local
213    /// sockets:
214    ///
215    ///   1. Creating multiple listening sockets for the same address
216    ///      and port to allow accepting connections on multiple threads
217    ///      without having to synchronise access to a single listen socket.
218    ///
219    ///   2. Creating outgoing connections whose local socket is bound to
220    ///      the same address and port as a listening socket. In the rare
221    ///      case of simple NATs with both endpoint-independent mapping and
222    ///      endpoint-independent filtering, this can on its own already
223    ///      permit NAT traversal by other nodes sharing the observed
224    ///      external address of the local node. For the common case of
225    ///      NATs with address-dependent or address and port-dependent
226    ///      filtering, port reuse for outgoing connections can facilitate
227    ///      further TCP hole punching techniques for NATs that perform
228    ///      endpoint-independent mapping. Port reuse cannot facilitate
229    ///      NAT traversal in the presence of "symmetric" NATs that employ
230    ///      both address/port-dependent mapping and filtering, unless
231    ///      there is some means of port prediction.
232    ///
233    /// Both use-cases are enabled when port reuse is enabled, with port reuse
234    /// for outgoing connections (`2.` above) always being implied.
235    ///
236    /// > **Note**: Due to the identification of a TCP socket by a 4-tuple
237    /// > of source IP address, source port, destination IP address and
238    /// > destination port, with port reuse enabled there can be only
239    /// > a single outgoing connection to a particular address and port
240    /// > of a peer per local listening socket address.
241    ///
242    /// If enabled, the returned `GenTcpConfig` and all of its `Clone`s
243    /// keep track of the listen socket addresses as they are reported
244    /// by polling [`TcpListenStream`]s obtained from [`GenTcpConfig::listen_on()`].
245    ///
246    /// In contrast, two `GenTcpConfig`s constructed separately via [`GenTcpConfig::new()`]
247    /// maintain these addresses independently. It is thus possible to listen on
248    /// multiple addresses, enabling port reuse for each, knowing exactly which
249    /// listen address is reused when dialing with a specific `GenTcpConfig`, as in
250    /// the following example:
251    ///
252    /// ```no_run
253    /// # use tet_libp2p_core::transport::ListenerEvent;
254    /// # use tet_libp2p_core::{Multiaddr, Transport};
255    /// # use futures::stream::StreamExt;
256    /// #[cfg(feature = "async-io")]
257    /// #[async_std::main]
258    /// async fn main() -> std::io::Result<()> {
259    /// use tet_libp2p_tcp::TcpConfig;
260    ///
261    /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap();
262    /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap();
263    ///
264    /// let tcp1 = TcpConfig::new().port_reuse(true);
265    /// let mut listener1 = tcp1.clone().listen_on(listen_addr1.clone()).expect("listener");
266    /// match listener1.next().await.expect("event")? {
267    ///     ListenerEvent::NewAddress(listen_addr) => {
268    ///         println!("Listening on {:?}", listen_addr);
269    ///         let mut stream = tcp1.dial(listen_addr2.clone()).unwrap().await?;
270    ///         // `stream` has `listen_addr1` as its local socket address.
271    ///     }
272    ///     _ => {}
273    /// }
274    ///
275    /// let tcp2 = TcpConfig::new().port_reuse(true);
276    /// let mut listener2 = tcp2.clone().listen_on(listen_addr2).expect("listener");
277    /// match listener2.next().await.expect("event")? {
278    ///     ListenerEvent::NewAddress(listen_addr) => {
279    ///         println!("Listening on {:?}", listen_addr);
280    ///         let mut socket = tcp2.dial(listen_addr1).unwrap().await?;
281    ///         // `stream` has `listen_addr2` as its local socket address.
282    ///     }
283    ///     _ => {}
284    /// }
285    /// Ok(())
286    /// }
287    /// ```
288    ///
289    /// If a single `GenTcpConfig` is used and cloned for the creation of multiple
290    /// listening sockets or a wildcard listen socket address is used to listen
291    /// on any interface, there can be multiple such addresses registered for
292    /// port reuse. In this case, one is chosen whose IP protocol version and
293    /// loopback status is the same as that of the remote address. Consequently, for
294    /// maximum control of the local listening addresses and ports that are used
295    /// for outgoing connections, a new `GenTcpConfig` should be created for each
296    /// listening socket, avoiding the use of wildcard addresses which bind a
297    /// socket to all network interfaces.
298    ///
299    /// When this option is enabled on a unix system, the socket
300    /// option `SO_REUSEPORT` is set, if available, to permit
301    /// reuse of listening ports for multiple sockets.
302    pub fn port_reuse(mut self, port_reuse: bool) -> Self {
303        self.port_reuse = if port_reuse {
304            PortReuse::Enabled {
305                listen_addrs: Arc::new(RwLock::new(HashSet::new()))
306            }
307        } else {
308            PortReuse::Disabled
309        };
310
311        self
312    }
313
314    fn create_socket(&self, socket_addr: &SocketAddr) -> io::Result<Socket> {
315        let domain = if socket_addr.is_ipv4() {
316            Domain::ipv4()
317        } else {
318            Domain::ipv6()
319        };
320        let socket = Socket::new(domain, Type::stream(), Some(socket2::Protocol::tcp()))?;
321        if socket_addr.is_ipv6() {
322            socket.set_only_v6(true)?;
323        }
324        if let Some(ttl) = self.ttl {
325            socket.set_ttl(ttl)?;
326        }
327        if let Some(nodelay) = self.nodelay {
328            socket.set_nodelay(nodelay)?;
329        }
330        socket.set_reuse_address(true)?;
331        #[cfg(unix)]
332        if let PortReuse::Enabled { .. } = &self.port_reuse {
333            socket.set_reuse_port(true)?;
334        }
335        Ok(socket)
336    }
337
338    fn do_listen(self, socket_addr: SocketAddr) -> io::Result<TcpListenStream<T>> {
339        let socket = self.create_socket(&socket_addr)?;
340        socket.bind(&socket_addr.into())?;
341        socket.listen(self.backlog as _)?;
342        socket.set_nonblocking(true)?;
343        TcpListenStream::<T>::new(socket.into_tcp_listener(), self.port_reuse)
344    }
345
346    async fn do_dial(self, socket_addr: SocketAddr) -> Result<T::Stream, io::Error> {
347        let socket = self.create_socket(&socket_addr)?;
348
349        if let Some(addr) = self.port_reuse.local_dial_addr(&socket_addr.ip()) {
350            log::trace!("Binding dial socket to listen socket {}", addr);
351            socket.bind(&addr.into())?;
352        }
353
354        socket.set_nonblocking(true)?;
355
356        match socket.connect(&socket_addr.into()) {
357            Ok(()) => {}
358            Err(err) if err.raw_os_error() == Some(libc::EINPROGRESS) => {}
359            Err(err) if err.kind() == io::ErrorKind::WouldBlock => {}
360            Err(err) => return Err(err),
361        };
362
363        let stream = T::new_stream(socket.into_tcp_stream()).await?;
364        Ok(stream)
365    }
366}
367
368impl<T> Transport for GenTcpConfig<T>
369where
370    T: Provider + Send + 'static,
371    T::Listener: Unpin,
372    T::IfWatcher: Unpin,
373    T::Stream: Unpin,
374{
375    type Output = T::Stream;
376    type Error = io::Error;
377    type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
378    type Listener = TcpListenStream<T>;
379    type ListenerUpgrade = Ready<Result<Self::Output, Self::Error>>;
380
381    fn listen_on(self, addr: Multiaddr) -> Result<Self::Listener, TransportError<Self::Error>> {
382        let socket_addr = if let Ok(sa) = multiaddr_to_socketaddr(&addr) {
383            sa
384        } else {
385            return Err(TransportError::MultiaddrNotSupported(addr));
386        };
387        log::debug!("listening on {}", socket_addr);
388        self.do_listen(socket_addr)
389            .map_err(|e| TransportError::Other(e))
390    }
391
392    fn dial(self, addr: Multiaddr) -> Result<Self::Dial, TransportError<Self::Error>> {
393        let socket_addr = if let Ok(socket_addr) = multiaddr_to_socketaddr(&addr) {
394            if socket_addr.port() == 0 || socket_addr.ip().is_unspecified() {
395                return Err(TransportError::MultiaddrNotSupported(addr));
396            }
397            socket_addr
398        } else {
399            return Err(TransportError::MultiaddrNotSupported(addr));
400        };
401        log::debug!("dialing {}", socket_addr);
402        Ok(Box::pin(self.do_dial(socket_addr)))
403    }
404
405    /// When port reuse is disabled and hence ephemeral local ports are
406    /// used for outgoing connections, the returned address is the
407    /// `observed` address with the port replaced by the port of the
408    /// `listen` address.
409    ///
410    /// If port reuse is enabled, `Some(observed)` is returned, as there
411    /// is a chance that the `observed` address _and_ port are reachable
412    /// for other peers if there is a NAT in the way that does endpoint-
413    /// independent filtering. Furthermore, even if that is not the case
414    /// and TCP hole punching techniques must be used for NAT traversal,
415    /// the `observed` address is still the one that a remote should connect
416    /// to for the purpose of the hole punching procedure, as it represents
417    /// the mapped IP and port of the NAT device in front of the local
418    /// node.
419    ///
420    /// `None` is returned if one of the given addresses is not a TCP/IP
421    /// address.
422    fn address_translation(&self, listen: &Multiaddr, observed: &Multiaddr) -> Option<Multiaddr> {
423        match &self.port_reuse {
424            PortReuse::Disabled => address_translation(listen, observed),
425            PortReuse::Enabled { .. } => Some(observed.clone()),
426        }
427    }
428}
429
430type TcpListenerEvent<S> = ListenerEvent<Ready<Result<S, io::Error>>, io::Error>;
431
432enum IfWatch<TIfWatcher> {
433    Pending(BoxFuture<'static, io::Result<TIfWatcher>>),
434    Ready(TIfWatcher),
435}
436
437/// The listening addresses of a [`TcpListenStream`].
438enum InAddr<TIfWatcher> {
439    /// The stream accepts connections on a single interface.
440    One {
441        addr: IpAddr,
442        out: Option<Multiaddr>
443    },
444    /// The stream accepts connections on all interfaces.
445    Any {
446        addrs: HashSet<IpAddr>,
447        if_watch: IfWatch<TIfWatcher>,
448    }
449}
450
451/// A stream of incoming connections on one or more interfaces.
452pub struct TcpListenStream<T>
453where
454    T: Provider
455{
456    /// The socket address that the listening socket is bound to,
457    /// which may be a "wildcard address" like `INADDR_ANY` or `IN6ADDR_ANY`
458    /// when listening on all interfaces for IPv4 respectively IPv6 connections.
459    listen_addr: SocketAddr,
460    /// The async listening socket for incoming connections.
461    listener: T::Listener,
462    /// The IP addresses of network interfaces on which the listening socket
463    /// is accepting connections.
464    ///
465    /// If the listen socket listens on all interfaces, these may change over
466    /// time as interfaces become available or unavailable.
467    in_addr: InAddr<T::IfWatcher>,
468    /// The port reuse configuration for outgoing connections.
469    ///
470    /// If enabled, all IP addresses on which this listening stream
471    /// is accepting connections (`in_addr`) are registered for reuse
472    /// as local addresses for the sockets of outgoing connections. They are
473    /// unregistered when the stream encounters an error or is dropped.
474    port_reuse: PortReuse,
475    /// How long to sleep after a (non-fatal) error while trying
476    /// to accept a new connection.
477    sleep_on_error: Duration,
478    /// The current pause, if any.
479    pause: Option<Delay>,
480}
481
482impl<T> TcpListenStream<T>
483where
484    T: Provider
485{
486    /// Constructs a `TcpListenStream` for incoming connections around
487    /// the given `TcpListener`.
488    fn new(listener: TcpListener, port_reuse: PortReuse) -> io::Result<Self> {
489        let listen_addr = listener.local_addr()?;
490
491        let in_addr = if match &listen_addr {
492            SocketAddr::V4(a) => a.ip().is_unspecified(),
493            SocketAddr::V6(a) => a.ip().is_unspecified(),
494        } {
495            // The `addrs` are populated via `if_watch` when the
496            // `TcpListenStream` is polled.
497            InAddr::Any {
498                addrs: HashSet::new(),
499                if_watch: IfWatch::Pending(T::if_watcher()),
500            }
501        } else {
502            InAddr::One {
503                out: Some(ip_to_multiaddr(listen_addr.ip(), listen_addr.port())),
504                addr: listen_addr.ip(),
505            }
506        };
507
508        let listener = T::new_listener(listener)?;
509
510        Ok(TcpListenStream {
511            port_reuse,
512            listener,
513            listen_addr,
514            in_addr,
515            pause: None,
516            sleep_on_error: Duration::from_millis(100),
517        })
518    }
519
520    /// Disables port reuse for any listen address of this stream.
521    ///
522    /// This is done when the `TcpListenStream` encounters a fatal
523    /// error (for the stream) or is dropped.
524    ///
525    /// Has no effect if port reuse is disabled.
526    fn disable_port_reuse(&mut self) {
527        match &self.in_addr {
528            InAddr::One { addr, .. } => {
529                self.port_reuse.unregister(*addr, self.listen_addr.port());
530            },
531            InAddr::Any { addrs, .. } => {
532                for addr in addrs {
533                    self.port_reuse.unregister(*addr, self.listen_addr.port());
534                }
535            }
536        }
537    }
538}
539
540impl<T> Drop for TcpListenStream<T>
541where
542    T: Provider
543{
544    fn drop(&mut self) {
545        self.disable_port_reuse();
546    }
547}
548
549impl<T> Stream for TcpListenStream<T>
550where
551    T: Provider,
552    T::Listener: Unpin,
553    T::Stream: Unpin,
554    T::IfWatcher: Unpin,
555{
556    type Item = Result<TcpListenerEvent<T::Stream>, io::Error>;
557
558    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
559        let me = Pin::into_inner(self);
560
561        loop {
562            match &mut me.in_addr {
563                InAddr::Any { if_watch, addrs } => match if_watch {
564                    // If we listen on all interfaces, wait for `if-watch` to be ready.
565                    IfWatch::Pending(f) => match ready!(Pin::new(f).poll(cx)) {
566                        Ok(w) => {
567                            *if_watch = IfWatch::Ready(w);
568                            continue
569                        }
570                        Err(err) => {
571                            log::debug! {
572                                "Failed to begin observing interfaces: {:?}. Scheduling retry.",
573                                err
574                            };
575                            *if_watch = IfWatch::Pending(T::if_watcher());
576                            me.pause = Some(Delay::new(me.sleep_on_error));
577                            return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
578                        }
579                    },
580                    // Consume all events for up/down interface changes.
581                    IfWatch::Ready(watch) => while let Poll::Ready(ev) = T::poll_interfaces(watch, cx) {
582                        match ev {
583                            Ok(IfEvent::Up(inet)) => {
584                                let ip = inet.addr();
585                                if me.listen_addr.is_ipv4() == ip.is_ipv4() {
586                                    if addrs.insert(ip) {
587                                        let ma = ip_to_multiaddr(ip, me.listen_addr.port());
588                                        log::debug!("New listen address: {}", ma);
589                                        me.port_reuse.register(ip, me.listen_addr.port());
590                                        return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(ma))));
591                                    }
592                                }
593                            }
594                            Ok(IfEvent::Down(inet)) => {
595                                let ip = inet.addr();
596                                if me.listen_addr.is_ipv4() == ip.is_ipv4() {
597                                    if addrs.remove(&ip) {
598                                        let ma = ip_to_multiaddr(ip, me.listen_addr.port());
599                                        log::debug!("Expired listen address: {}", ma);
600                                        me.port_reuse.unregister(ip, me.listen_addr.port());
601                                        return Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(ma))));
602                                    }
603                                }
604                            }
605                            Err(err) => {
606                                log::debug! {
607                                    "Failure polling interfaces: {:?}. Scheduling retry.",
608                                    err
609                                };
610                                me.pause = Some(Delay::new(me.sleep_on_error));
611                                return Poll::Ready(Some(Ok(ListenerEvent::Error(err))));
612                            }
613                        }
614                    },
615                },
616                // If the listener is bound to a single interface, make sure the
617                // address is registered for port reuse and reported once.
618                InAddr::One { addr, out } => if let Some(multiaddr) = out.take() {
619                    me.port_reuse.register(*addr, me.listen_addr.port());
620                    return Poll::Ready(Some(Ok(ListenerEvent::NewAddress(multiaddr))))
621                }
622            }
623
624            if let Some(mut pause) = me.pause.take() {
625                match Pin::new(&mut pause).poll(cx) {
626                    Poll::Ready(_) => {}
627                    Poll::Pending => {
628                        me.pause = Some(pause);
629                        return Poll::Pending;
630                    }
631                }
632            }
633
634            // Take the pending connection from the backlog.
635            let incoming = match T::poll_accept(&mut me.listener, cx) {
636                Poll::Pending => return Poll::Pending,
637                Poll::Ready(Ok(incoming)) => incoming,
638                Poll::Ready(Err(e)) => {
639                    // These errors are non-fatal for the listener stream.
640                    log::error!("error accepting incoming connection: {}", e);
641                    me.pause = Some(Delay::new(me.sleep_on_error));
642                    return Poll::Ready(Some(Ok(ListenerEvent::Error(e))));
643                }
644            };
645
646            let local_addr = ip_to_multiaddr(incoming.local_addr.ip(), incoming.local_addr.port());
647            let remote_addr = ip_to_multiaddr(incoming.remote_addr.ip(), incoming.remote_addr.port());
648
649            log::debug!("Incoming connection from {} at {}", remote_addr, local_addr);
650
651            return Poll::Ready(Some(Ok(ListenerEvent::Upgrade {
652                upgrade: future::ok(incoming.stream),
653                local_addr,
654                remote_addr,
655            })));
656        }
657    }
658}
659
660// This type of logic should probably be moved into the multiaddr package
661fn multiaddr_to_socketaddr(addr: &Multiaddr) -> Result<SocketAddr, ()> {
662    let mut iter = addr.iter();
663    let proto1 = iter.next().ok_or(())?;
664    let proto2 = iter.next().ok_or(())?;
665
666    if iter.next().is_some() {
667        return Err(());
668    }
669
670    match (proto1, proto2) {
671        (Protocol::Ip4(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
672        (Protocol::Ip6(ip), Protocol::Tcp(port)) => Ok(SocketAddr::new(ip.into(), port)),
673        _ => Err(()),
674    }
675}
676
677// Create a [`Multiaddr`] from the given IP address and port number.
678fn ip_to_multiaddr(ip: IpAddr, port: u16) -> Multiaddr {
679    Multiaddr::empty()
680        .with(ip.into())
681        .with(Protocol::Tcp(port))
682}
683
684#[cfg(test)]
685mod tests {
686    use futures::channel::mpsc;
687    use super::*;
688
689    #[test]
690    fn multiaddr_to_tcp_conversion() {
691        use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
692
693        assert!(
694            multiaddr_to_socketaddr(&"/ip4/127.0.0.1/udp/1234".parse::<Multiaddr>().unwrap())
695                .is_err()
696        );
697
698        assert_eq!(
699            multiaddr_to_socketaddr(&"/ip4/127.0.0.1/tcp/12345".parse::<Multiaddr>().unwrap()),
700            Ok(SocketAddr::new(
701                IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
702                12345,
703            ))
704        );
705        assert_eq!(
706            multiaddr_to_socketaddr(
707                &"/ip4/255.255.255.255/tcp/8080"
708                    .parse::<Multiaddr>()
709                    .unwrap()
710            ),
711            Ok(SocketAddr::new(
712                IpAddr::V4(Ipv4Addr::new(255, 255, 255, 255)),
713                8080,
714            ))
715        );
716        assert_eq!(
717            multiaddr_to_socketaddr(&"/ip6/::1/tcp/12345".parse::<Multiaddr>().unwrap()),
718            Ok(SocketAddr::new(
719                IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)),
720                12345,
721            ))
722        );
723        assert_eq!(
724            multiaddr_to_socketaddr(
725                &"/ip6/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff/tcp/8080"
726                    .parse::<Multiaddr>()
727                    .unwrap()
728            ),
729            Ok(SocketAddr::new(
730                IpAddr::V6(Ipv6Addr::new(
731                    65535, 65535, 65535, 65535, 65535, 65535, 65535, 65535,
732                )),
733                8080,
734            ))
735        );
736    }
737
738    #[test]
739    fn communicating_between_dialer_and_listener() {
740        env_logger::try_init().ok();
741
742        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>)  {
743            let tcp = GenTcpConfig::<T>::new();
744            let mut listener = tcp.listen_on(addr).unwrap();
745            loop {
746                match listener.next().await.unwrap().unwrap() {
747                    ListenerEvent::NewAddress(listen_addr) => {
748                        ready_tx.send(listen_addr).await.unwrap();
749                    }
750                    ListenerEvent::Upgrade { upgrade, .. } => {
751                        let mut upgrade = upgrade.await.unwrap();
752                        let mut buf = [0u8; 3];
753                        upgrade.read_exact(&mut buf).await.unwrap();
754                        assert_eq!(buf, [1, 2, 3]);
755                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
756                        return
757                    }
758                    e => panic!("Unexpected listener event: {:?}", e),
759                }
760            }
761        }
762
763        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
764            let addr = ready_rx.next().await.unwrap();
765            let tcp = GenTcpConfig::<T>::new();
766
767            // Obtain a future socket through dialing
768            let mut socket = tcp.dial(addr.clone()).unwrap().await.unwrap();
769            socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
770
771            let mut buf = [0u8; 3];
772            socket.read_exact(&mut buf).await.unwrap();
773            assert_eq!(buf, [4, 5, 6]);
774        }
775
776        fn test(addr: Multiaddr) {
777            #[cfg(feature = "async-io")]
778            {
779                let (ready_tx, ready_rx) = mpsc::channel(1);
780                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
781                let dialer = dialer::<async_io::Tcp>(ready_rx);
782                let listener = async_std::task::spawn(listener);
783                async_std::task::block_on(dialer);
784                async_std::task::block_on(listener);
785            }
786
787            #[cfg(feature = "tokio")]
788            {
789                let (ready_tx, ready_rx) = mpsc::channel(1);
790                let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
791                let dialer = dialer::<tokio::Tcp>(ready_rx);
792                let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
793                let tasks = tokio_crate::task::LocalSet::new();
794                let listener = tasks.spawn_local(listener);
795                tasks.block_on(&rt, dialer);
796                tasks.block_on(&rt, listener).unwrap();
797            }
798        }
799
800        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
801        test("/ip6/::1/tcp/0".parse().unwrap());
802    }
803
804    #[test]
805    fn wildcard_expansion() {
806        env_logger::try_init().ok();
807
808        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
809            let tcp = GenTcpConfig::<T>::new();
810            let mut listener = tcp.listen_on(addr).unwrap();
811
812            loop {
813                match listener.next().await.unwrap().unwrap() {
814                    ListenerEvent::NewAddress(a) => {
815                        let mut iter = a.iter();
816                        match iter.next().expect("ip address") {
817                            Protocol::Ip4(ip) => assert!(!ip.is_unspecified()),
818                            Protocol::Ip6(ip) => assert!(!ip.is_unspecified()),
819                            other => panic!("Unexpected protocol: {}", other),
820                        }
821                        if let Protocol::Tcp(port) = iter.next().expect("port") {
822                            assert_ne!(0, port)
823                        } else {
824                            panic!("No TCP port in address: {}", a)
825                        }
826                        ready_tx.send(a).await.ok();
827                        return
828                    }
829                    _ => {}
830                }
831            }
832        }
833
834        async fn dialer<T: Provider>(mut ready_rx: mpsc::Receiver<Multiaddr>) {
835            let dest_addr = ready_rx.next().await.unwrap();
836            let tcp = GenTcpConfig::<T>::new();
837            tcp.dial(dest_addr).unwrap().await.unwrap();
838        }
839
840        fn test(addr: Multiaddr) {
841            #[cfg(feature = "async-io")]
842            {
843                let (ready_tx, ready_rx) = mpsc::channel(1);
844                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
845                let dialer = dialer::<async_io::Tcp>(ready_rx);
846                let listener = async_std::task::spawn(listener);
847                async_std::task::block_on(dialer);
848                async_std::task::block_on(listener);
849            }
850
851            #[cfg(feature = "tokio")]
852            {
853                let (ready_tx, ready_rx) = mpsc::channel(1);
854                let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
855                let dialer = dialer::<tokio::Tcp>(ready_rx);
856                let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
857                let tasks = tokio_crate::task::LocalSet::new();
858                let listener = tasks.spawn_local(listener);
859                tasks.block_on(&rt, dialer);
860                tasks.block_on(&rt, listener).unwrap();
861            }
862        }
863
864        test("/ip4/0.0.0.0/tcp/0".parse().unwrap());
865        test("/ip6/::1/tcp/0".parse().unwrap());
866    }
867
868    #[test]
869    fn port_reuse_dialing() {
870        env_logger::try_init().ok();
871
872        async fn listener<T: Provider>(addr: Multiaddr, mut ready_tx: mpsc::Sender<Multiaddr>) {
873            let tcp = GenTcpConfig::<T>::new();
874            let mut listener = tcp.listen_on(addr).unwrap();
875            loop {
876                match listener.next().await.unwrap().unwrap() {
877                    ListenerEvent::NewAddress(listen_addr) => {
878                        ready_tx.send(listen_addr).await.ok();
879                    }
880                    ListenerEvent::Upgrade { upgrade, .. } => {
881                        let mut upgrade = upgrade.await.unwrap();
882                        let mut buf = [0u8; 3];
883                        upgrade.read_exact(&mut buf).await.unwrap();
884                        assert_eq!(buf, [1, 2, 3]);
885                        upgrade.write_all(&[4, 5, 6]).await.unwrap();
886                        return
887                    }
888                    e => panic!("Unexpected event: {:?}", e),
889                }
890            }
891        }
892
893        async fn dialer<T: Provider>(addr: Multiaddr, mut ready_rx: mpsc::Receiver<Multiaddr>) {
894            let dest_addr = ready_rx.next().await.unwrap();
895            let tcp = GenTcpConfig::<T>::new().port_reuse(true);
896            let mut listener = tcp.clone().listen_on(addr).unwrap();
897            match listener.next().await.unwrap().unwrap() {
898                ListenerEvent::NewAddress(_) => {
899                    // Obtain a future socket through dialing
900                    let mut socket = tcp.dial(dest_addr).unwrap().await.unwrap();
901                    socket.write_all(&[0x1, 0x2, 0x3]).await.unwrap();
902                    // socket.flush().await;
903                    let mut buf = [0u8; 3];
904                    socket.read_exact(&mut buf).await.unwrap();
905                    assert_eq!(buf, [4, 5, 6]);
906                }
907                e => panic!("Unexpected listener event: {:?}", e)
908            }
909        }
910
911        fn test(addr: Multiaddr) {
912            #[cfg(feature = "async-io")]
913            {
914                let (ready_tx, ready_rx) = mpsc::channel(1);
915                let listener = listener::<async_io::Tcp>(addr.clone(), ready_tx);
916                let dialer = dialer::<async_io::Tcp>(addr.clone(), ready_rx);
917                let listener = async_std::task::spawn(listener);
918                async_std::task::block_on(dialer);
919                async_std::task::block_on(listener);
920            }
921
922            #[cfg(feature = "tokio")]
923            {
924                let (ready_tx, ready_rx) = mpsc::channel(1);
925                let listener = listener::<tokio::Tcp>(addr.clone(), ready_tx);
926                let dialer = dialer::<tokio::Tcp>(addr.clone(), ready_rx);
927                let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
928                let tasks = tokio_crate::task::LocalSet::new();
929                let listener = tasks.spawn_local(listener);
930                tasks.block_on(&rt, dialer);
931                tasks.block_on(&rt, listener).unwrap();
932            }
933        }
934
935        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
936        test("/ip6/::1/tcp/0".parse().unwrap());
937    }
938
939    #[test]
940    fn port_reuse_listening() {
941        env_logger::try_init().ok();
942
943        async fn listen_twice<T: Provider>(addr: Multiaddr) {
944            let tcp = GenTcpConfig::<T>::new().port_reuse(true);
945            let mut listener1 = tcp.clone().listen_on(addr).unwrap();
946            match listener1.next().await.unwrap().unwrap() {
947                ListenerEvent::NewAddress(addr1) => {
948                    // Listen on the same address a second time.
949                    let mut listener2 = tcp.clone().listen_on(addr1.clone()).unwrap();
950                    match listener2.next().await.unwrap().unwrap() {
951                        ListenerEvent::NewAddress(addr2) => {
952                            assert_eq!(addr1, addr2);
953                            return
954                        }
955                        e => panic!("Unexpected listener event: {:?}", e),
956                    }
957                }
958                e => panic!("Unexpected listener event: {:?}", e),
959            }
960        }
961
962        fn test(addr: Multiaddr) {
963            #[cfg(feature = "async-io")]
964            {
965                let listener = listen_twice::<async_io::Tcp>(addr.clone());
966                async_std::task::block_on(listener);
967            }
968
969            #[cfg(feature = "tokio")]
970            {
971                let listener = listen_twice::<tokio::Tcp>(addr.clone());
972                let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
973                rt.block_on(listener);
974            }
975        }
976
977        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
978    }
979
980    #[test]
981    fn listen_port_0() {
982        env_logger::try_init().ok();
983
984        async fn listen<T: Provider>(addr: Multiaddr) -> Multiaddr {
985            GenTcpConfig::<T>::new()
986                .listen_on(addr)
987                .unwrap()
988                .next()
989                .await
990                .expect("some event")
991                .expect("no error")
992                .into_new_address()
993                .expect("listen address")
994        }
995
996        fn test(addr: Multiaddr) {
997            #[cfg(feature = "async-io")]
998            {
999                let new_addr = async_std::task::block_on(listen::<async_io::Tcp>(addr.clone()));
1000                assert!(!new_addr.to_string().contains("tcp/0"));
1001            }
1002
1003            #[cfg(feature = "tokio")]
1004            {
1005                let rt = tokio_crate::runtime::Builder::new_current_thread().enable_io().build().unwrap();
1006                let new_addr = rt.block_on(listen::<tokio::Tcp>(addr.clone()));
1007                assert!(!new_addr.to_string().contains("tcp/0"));
1008            }
1009        }
1010
1011        test("/ip6/::1/tcp/0".parse().unwrap());
1012        test("/ip4/127.0.0.1/tcp/0".parse().unwrap());
1013    }
1014
1015    #[test]
1016    fn listen_invalid_addr() {
1017        env_logger::try_init().ok();
1018
1019        fn test(addr: Multiaddr) {
1020            #[cfg(feature = "async-io")]
1021            {
1022                let tcp = TcpConfig::new();
1023                assert!(tcp.listen_on(addr.clone()).is_err());
1024            }
1025
1026            #[cfg(feature = "tokio")]
1027            {
1028                let tcp = TokioTcpConfig::new();
1029                assert!(tcp.listen_on(addr.clone()).is_err());
1030            }
1031        }
1032
1033        test("/ip4/127.0.0.1/tcp/12345/tcp/12345".parse().unwrap());
1034    }
1035}