libp2p_core/connection/
listeners.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Manage listening on multiple multiaddresses at once.
22
23use crate::{Multiaddr, Transport, transport::{TransportError, ListenerEvent}};
24use futures::{prelude::*, task::Context, task::Poll};
25use log::debug;
26use smallvec::SmallVec;
27use std::{collections::VecDeque, fmt, pin::Pin};
28
29/// Implementation of `futures::Stream` that allows listening on multiaddresses.
30///
31/// To start using a `ListenersStream`, create one with `new` by passing an implementation of
32/// `Transport`. This `Transport` will be used to start listening, therefore you want to pass
33/// a `Transport` that supports the protocols you wish you listen on.
34///
35/// Then, call `ListenerStream::listen_on` for all addresses you want to start listening on.
36///
37/// The `ListenersStream` never ends and never produces errors. If a listener errors or closes,
38/// an event is generated on the stream and the listener is then dropped, but the `ListenersStream`
39/// itself continues.
40///
41/// # Example
42///
43/// ```no_run
44/// use futures::prelude::*;
45/// use libp2p_core::connection::{ListenersEvent, ListenersStream};
46///
47/// let mut listeners = ListenersStream::new(libp2p_tcp::TcpConfig::new());
48///
49/// // Ask the `listeners` to start listening on the given multiaddress.
50/// listeners.listen_on("/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap();
51///
52/// // The `listeners` will now generate events when polled.
53/// futures::executor::block_on(async move {
54///     while let Some(event) = listeners.next().await {
55///         match event {
56///             ListenersEvent::NewAddress { listener_id, listen_addr } => {
57///                 println!("Listener {:?} is listening at address {}", listener_id, listen_addr);
58///             },
59///             ListenersEvent::AddressExpired { listener_id, listen_addr } => {
60///                 println!("Listener {:?} is no longer listening at address {}", listener_id, listen_addr);
61///             },
62///             ListenersEvent::Closed { listener_id, .. } => {
63///                 println!("Listener {:?} has been closed", listener_id);
64///             },
65///             ListenersEvent::Error { listener_id, error } => {
66///                 println!("Listener {:?} has experienced an error: {}", listener_id, error);
67///             },
68///             ListenersEvent::Incoming { listener_id, upgrade, local_addr, .. } => {
69///                 println!("Listener {:?} has a new connection on {}", listener_id, local_addr);
70///                 // We don't do anything with the newly-opened connection, but in a real-life
71///                 // program you probably want to use it!
72///                 drop(upgrade);
73///             },
74///         }
75///     }
76/// })
77/// ```
78pub struct ListenersStream<TTrans>
79where
80    TTrans: Transport,
81{
82    /// Transport used to spawn listeners.
83    transport: TTrans,
84    /// All the active listeners.
85    /// The `Listener` struct contains a stream that we want to be pinned. Since the `VecDeque`
86    /// can be resized, the only way is to use a `Pin<Box<>>`.
87    listeners: VecDeque<Pin<Box<Listener<TTrans>>>>,
88    /// The next listener ID to assign.
89    next_id: ListenerId
90}
91
92/// The ID of a single listener.
93///
94/// It is part of most [`ListenersEvent`]s and can be used to remove
95/// individual listeners from the [`ListenersStream`].
96#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
97pub struct ListenerId(u64);
98
99/// A single active listener.
100#[pin_project::pin_project]
101#[derive(Debug)]
102struct Listener<TTrans>
103where
104    TTrans: Transport,
105{
106    /// The ID of this listener.
107    id: ListenerId,
108    /// The object that actually listens.
109    #[pin]
110    listener: TTrans::Listener,
111    /// Addresses it is listening on.
112    addresses: SmallVec<[Multiaddr; 4]>
113}
114
115/// Event that can happen on the `ListenersStream`.
116pub enum ListenersEvent<TTrans>
117where
118    TTrans: Transport,
119{
120    /// A new address is being listened on.
121    NewAddress {
122        /// The listener that is listening on the new address.
123        listener_id: ListenerId,
124        /// The new address that is being listened on.
125        listen_addr: Multiaddr
126    },
127    /// An address is no longer being listened on.
128    AddressExpired {
129        /// The listener that is no longer listening on the address.
130        listener_id: ListenerId,
131        /// The new address that is being listened on.
132        listen_addr: Multiaddr
133    },
134    /// A connection is incoming on one of the listeners.
135    Incoming {
136        /// The listener that produced the upgrade.
137        listener_id: ListenerId,
138        /// The produced upgrade.
139        upgrade: TTrans::ListenerUpgrade,
140        /// Local connection address.
141        local_addr: Multiaddr,
142        /// Address used to send back data to the incoming client.
143        send_back_addr: Multiaddr,
144    },
145    /// A listener closed.
146    Closed {
147        /// The ID of the listener that closed.
148        listener_id: ListenerId,
149        /// The addresses that the listener was listening on.
150        addresses: Vec<Multiaddr>,
151        /// Reason for the closure. Contains `Ok(())` if the stream produced `None`, or `Err`
152        /// if the stream produced an error.
153        reason: Result<(), TTrans::Error>,
154    },
155    /// A listener errored.
156    ///
157    /// The listener will continue to be polled for new events and the event
158    /// is for informational purposes only.
159    Error {
160        /// The ID of the listener that errored.
161        listener_id: ListenerId,
162        /// The error value.
163        error: TTrans::Error,
164    }
165}
166
167impl<TTrans> ListenersStream<TTrans>
168where
169    TTrans: Transport,
170{
171    /// Starts a new stream of listeners.
172    pub fn new(transport: TTrans) -> Self {
173        ListenersStream {
174            transport,
175            listeners: VecDeque::new(),
176            next_id: ListenerId(1)
177        }
178    }
179
180    /// Same as `new`, but pre-allocates enough memory for the given number of
181    /// simultaneous listeners.
182    pub fn with_capacity(transport: TTrans, capacity: usize) -> Self {
183        ListenersStream {
184            transport,
185            listeners: VecDeque::with_capacity(capacity),
186            next_id: ListenerId(1)
187        }
188    }
189
190    /// Start listening on a multiaddress.
191    ///
192    /// Returns an error if the transport doesn't support the given multiaddress.
193    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>>
194    where
195        TTrans: Clone,
196    {
197        let listener = self.transport.clone().listen_on(addr)?;
198        self.listeners.push_back(Box::pin(Listener {
199            id: self.next_id,
200            listener,
201            addresses: SmallVec::new()
202        }));
203        let id = self.next_id;
204        self.next_id = ListenerId(self.next_id.0 + 1);
205        Ok(id)
206    }
207
208    /// Remove the listener matching the given `ListenerId`.
209    ///
210    /// Return `Ok(())` if a listener with this ID was in the list.
211    pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
212        if let Some(i) = self.listeners.iter().position(|l| l.id == id) {
213            self.listeners.remove(i);
214            Ok(())
215        } else {
216            Err(())
217        }
218    }
219
220    /// Returns the transport passed when building this object.
221    pub fn transport(&self) -> &TTrans {
222        &self.transport
223    }
224
225    /// Returns an iterator that produces the list of addresses we're listening on.
226    pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
227        self.listeners.iter().flat_map(|l| l.addresses.iter())
228    }
229
230    /// Provides an API similar to `Stream`, except that it cannot end.
231    pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<ListenersEvent<TTrans>> {
232        // We remove each element from `listeners` one by one and add them back.
233        let mut remaining = self.listeners.len();
234        while let Some(mut listener) = self.listeners.pop_back() {
235            let mut listener_project = listener.as_mut().project();
236            match TryStream::try_poll_next(listener_project.listener.as_mut(), cx) {
237                Poll::Pending => {
238                    self.listeners.push_front(listener);
239                    remaining -= 1;
240                    if remaining == 0 { break }
241                }
242                Poll::Ready(Some(Ok(ListenerEvent::Upgrade { upgrade, local_addr, remote_addr }))) => {
243                    let id = *listener_project.id;
244                    self.listeners.push_front(listener);
245                    return Poll::Ready(ListenersEvent::Incoming {
246                        listener_id: id,
247                        upgrade,
248                        local_addr,
249                        send_back_addr: remote_addr
250                    })
251                }
252                Poll::Ready(Some(Ok(ListenerEvent::NewAddress(a)))) => {
253                    if listener_project.addresses.contains(&a) {
254                        debug!("Transport has reported address {} multiple times", a)
255                    }
256                    if !listener_project.addresses.contains(&a) {
257                        listener_project.addresses.push(a.clone());
258                    }
259                    let id = *listener_project.id;
260                    self.listeners.push_front(listener);
261                    return Poll::Ready(ListenersEvent::NewAddress {
262                        listener_id: id,
263                        listen_addr: a
264                    })
265                }
266                Poll::Ready(Some(Ok(ListenerEvent::AddressExpired(a)))) => {
267                    listener_project.addresses.retain(|x| x != &a);
268                    let id = *listener_project.id;
269                    self.listeners.push_front(listener);
270                    return Poll::Ready(ListenersEvent::AddressExpired {
271                        listener_id: id,
272                        listen_addr: a
273                    })
274                }
275                Poll::Ready(Some(Ok(ListenerEvent::Error(error)))) => {
276                    let id = *listener_project.id;
277                    self.listeners.push_front(listener);
278                    return Poll::Ready(ListenersEvent::Error {
279                        listener_id: id,
280                        error,
281                    })
282                }
283                Poll::Ready(None) => {
284                    return Poll::Ready(ListenersEvent::Closed {
285                        listener_id: *listener_project.id,
286                        addresses: listener_project.addresses.drain(..).collect(),
287                        reason: Ok(()),
288                    })
289                }
290                Poll::Ready(Some(Err(err))) => {
291                    return Poll::Ready(ListenersEvent::Closed {
292                        listener_id: *listener_project.id,
293                        addresses: listener_project.addresses.drain(..).collect(),
294                        reason: Err(err),
295                    })
296                }
297            }
298        }
299
300        // We register the current task to be woken up if a new listener is added.
301        Poll::Pending
302    }
303}
304
305impl<TTrans> Stream for ListenersStream<TTrans>
306where
307    TTrans: Transport,
308{
309    type Item = ListenersEvent<TTrans>;
310
311    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
312        ListenersStream::poll(self, cx).map(Option::Some)
313    }
314}
315
316impl<TTrans> Unpin for ListenersStream<TTrans>
317where
318    TTrans: Transport,
319{
320}
321
322impl<TTrans> fmt::Debug for ListenersStream<TTrans>
323where
324    TTrans: Transport + fmt::Debug,
325{
326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
327        f.debug_struct("ListenersStream")
328            .field("transport", &self.transport)
329            .field("listen_addrs", &self.listen_addrs().collect::<Vec<_>>())
330            .finish()
331    }
332}
333
334impl<TTrans> fmt::Debug for ListenersEvent<TTrans>
335where
336    TTrans: Transport,
337    TTrans::Error: fmt::Debug,
338{
339    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
340        match self {
341            ListenersEvent::NewAddress { listener_id, listen_addr } => f
342                .debug_struct("ListenersEvent::NewAddress")
343                .field("listener_id", listener_id)
344                .field("listen_addr", listen_addr)
345                .finish(),
346            ListenersEvent::AddressExpired { listener_id, listen_addr } => f
347                .debug_struct("ListenersEvent::AddressExpired")
348                .field("listener_id", listener_id)
349                .field("listen_addr", listen_addr)
350                .finish(),
351            ListenersEvent::Incoming { listener_id, local_addr, .. } => f
352                .debug_struct("ListenersEvent::Incoming")
353                .field("listener_id", listener_id)
354                .field("local_addr", local_addr)
355                .finish(),
356            ListenersEvent::Closed { listener_id, addresses, reason } => f
357                .debug_struct("ListenersEvent::Closed")
358                .field("listener_id", listener_id)
359                .field("addresses", addresses)
360                .field("reason", reason)
361                .finish(),
362            ListenersEvent::Error { listener_id, error } => f
363                .debug_struct("ListenersEvent::Error")
364                .field("listener_id", listener_id)
365                .field("error", error)
366                .finish()
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374    use crate::transport;
375
376    #[test]
377    fn incoming_event() {
378        async_std::task::block_on(async move {
379            let mem_transport = transport::MemoryTransport::default();
380
381            let mut listeners = ListenersStream::new(mem_transport);
382            listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
383
384            let address = {
385                let event = listeners.next().await.unwrap();
386                if let ListenersEvent::NewAddress { listen_addr, .. } = event {
387                    listen_addr
388                } else {
389                    panic!("Was expecting the listen address to be reported")
390                }
391            };
392
393            let address2 = address.clone();
394            async_std::task::spawn(async move {
395                mem_transport.dial(address2).unwrap().await.unwrap();
396            });
397
398            match listeners.next().await.unwrap() {
399                ListenersEvent::Incoming { local_addr, send_back_addr, .. } => {
400                    assert_eq!(local_addr, address);
401                    assert!(send_back_addr != address);
402                },
403                _ => panic!()
404            }
405        });
406    }
407
408    #[test]
409    fn listener_event_error_isnt_fatal() {
410        // Tests that a listener continues to be polled even after producing
411        // a `ListenerEvent::Error`.
412
413        #[derive(Clone)]
414        struct DummyTrans;
415        impl transport::Transport for DummyTrans {
416            type Output = ();
417            type Error = std::io::Error;
418            type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
419            type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
420            type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
421
422            fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
423                Ok(Box::pin(stream::unfold((), |()| async move {
424                    Some((Ok(ListenerEvent::Error(std::io::Error::from(std::io::ErrorKind::Other))), ()))
425                })))
426            }
427
428            fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
429                panic!()
430            }
431
432            fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
433        }
434
435        async_std::task::block_on(async move {
436            let transport = DummyTrans;
437            let mut listeners = ListenersStream::new(transport);
438            listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
439
440            for _ in 0..10 {
441                match listeners.next().await.unwrap() {
442                    ListenersEvent::Error { .. } => {},
443                    _ => panic!()
444                }
445            }
446        });
447    }
448
449    #[test]
450    fn listener_error_is_fatal() {
451        // Tests that a listener stops after producing an error on the stream itself.
452
453        #[derive(Clone)]
454        struct DummyTrans;
455        impl transport::Transport for DummyTrans {
456            type Output = ();
457            type Error = std::io::Error;
458            type Listener = Pin<Box<dyn Stream<Item = Result<ListenerEvent<Self::ListenerUpgrade, std::io::Error>, std::io::Error>>>>;
459            type ListenerUpgrade = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
460            type Dial = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>>>>;
461
462            fn listen_on(self, _: Multiaddr) -> Result<Self::Listener, transport::TransportError<Self::Error>> {
463                Ok(Box::pin(stream::unfold((), |()| async move {
464                    Some((Err(std::io::Error::from(std::io::ErrorKind::Other)), ()))
465                })))
466            }
467
468            fn dial(self, _: Multiaddr) -> Result<Self::Dial, transport::TransportError<Self::Error>> {
469                panic!()
470            }
471
472            fn address_translation(&self, _: &Multiaddr, _: &Multiaddr) -> Option<Multiaddr> { None }
473        }
474
475        async_std::task::block_on(async move {
476            let transport = DummyTrans;
477            let mut listeners = ListenersStream::new(transport);
478            listeners.listen_on("/memory/0".parse().unwrap()).unwrap();
479
480            match listeners.next().await.unwrap() {
481                ListenersEvent::Closed { .. } => {},
482                _ => panic!()
483            }
484        });
485    }
486}