tet_libp2p_core/connection/
pool.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    ConnectedPoint,
23    PeerId,
24    connection::{
25        self,
26        Connected,
27        Connection,
28        ConnectionId,
29        ConnectionLimit,
30        ConnectionError,
31        ConnectionHandler,
32        IncomingInfo,
33        IntoConnectionHandler,
34        OutgoingInfo,
35        Substream,
36        PendingConnectionError,
37        manager::{self, Manager, ManagerConfig},
38    },
39    muxing::StreamMuxer,
40};
41use either::Either;
42use fnv::FnvHashMap;
43use futures::prelude::*;
44use smallvec::SmallVec;
45use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
46
47/// A connection `Pool` manages a set of connections for each peer.
48pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
49    local_id: PeerId,
50
51    /// The connection counter(s).
52    counters: ConnectionCounters,
53
54    /// The connection manager that handles the connection I/O for both
55    /// established and pending connections.
56    ///
57    /// For every established connection there is a corresponding entry in `established`.
58    manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
59
60    /// The managed connections of each peer that are currently considered
61    /// established, as witnessed by the associated `ConnectedPoint`.
62    established: FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
63
64    /// The pending connections that are currently being negotiated.
65    pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
66
67    /// Established connections that have been closed in the context of
68    /// a [`Pool::disconnect`] in order to emit a `ConnectionClosed`
69    /// event for each. Every `ConnectionEstablished` event must be
70    /// paired with (eventually) a `ConnectionClosed`.
71    disconnected: Vec<Disconnected>,
72}
73
74impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
75for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
76{
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
78        f.debug_struct("Pool")
79            .field("counters", &self.counters)
80            .finish()
81    }
82}
83
84impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
85for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
86
87/// Event that can happen on the `Pool`.
88pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
89    /// A new connection has been established.
90    ConnectionEstablished {
91        connection: EstablishedConnection<'a, TInEvent>,
92        num_established: NonZeroU32,
93    },
94
95    /// An established connection was closed.
96    ///
97    /// A connection may close if
98    ///
99    ///   * it encounters an error, which includes the connection being
100    ///     closed by the remote. In this case `error` is `Some`.
101    ///   * it was actively closed by [`EstablishedConnection::start_close`],
102    ///     i.e. a successful, orderly close.
103    ///   * it was actively closed by [`Pool::disconnect`], i.e.
104    ///     dropped without an orderly close.
105    ///
106    ConnectionClosed {
107        id: ConnectionId,
108        /// Information about the connection that errored.
109        connected: Connected,
110        /// The error that occurred, if any. If `None`, the connection
111        /// was closed by the local peer.
112        error: Option<ConnectionError<THandlerErr>>,
113        /// A reference to the pool that used to manage the connection.
114        pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
115        /// The remaining number of established connections to the same peer.
116        num_established: u32,
117    },
118
119    /// A connection attempt failed.
120    PendingConnectionError {
121        /// The ID of the failed connection.
122        id: ConnectionId,
123        /// The local endpoint of the failed connection.
124        endpoint: ConnectedPoint,
125        /// The error that occurred.
126        error: PendingConnectionError<TTransErr>,
127        /// The handler that was supposed to handle the connection,
128        /// if the connection failed before the handler was consumed.
129        handler: Option<THandler>,
130        /// The (expected) peer of the failed connection.
131        peer: Option<PeerId>,
132        /// A reference to the pool that managed the connection.
133        pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
134    },
135
136    /// A node has produced an event.
137    ConnectionEvent {
138        /// The connection that has generated the event.
139        connection: EstablishedConnection<'a, TInEvent>,
140        /// The produced event.
141        event: TOutEvent,
142    },
143
144    /// The connection to a node has changed its address.
145    AddressChange {
146        /// The connection that has changed address.
147        connection: EstablishedConnection<'a, TInEvent>,
148        /// The new endpoint.
149        new_endpoint: ConnectedPoint,
150        /// The old endpoint.
151        old_endpoint: ConnectedPoint,
152    },
153}
154
155impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
156for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
157where
158    TOutEvent: fmt::Debug,
159    TTransErr: fmt::Debug,
160    THandlerErr: fmt::Debug,
161    TInEvent: fmt::Debug,
162{
163    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
164        match *self {
165            PoolEvent::ConnectionEstablished { ref connection, .. } => {
166                f.debug_tuple("PoolEvent::ConnectionEstablished")
167                    .field(connection)
168                    .finish()
169            },
170            PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => {
171                f.debug_struct("PoolEvent::ConnectionClosed")
172                    .field("id", id)
173                    .field("connected", connected)
174                    .field("error", error)
175                    .finish()
176            },
177            PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
178                f.debug_struct("PoolEvent::PendingConnectionError")
179                    .field("id", id)
180                    .field("error", error)
181                    .finish()
182            },
183            PoolEvent::ConnectionEvent { ref connection, ref event } => {
184                f.debug_struct("PoolEvent::ConnectionEvent")
185                    .field("peer", connection.peer_id())
186                    .field("event", event)
187                    .finish()
188            },
189            PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
190                f.debug_struct("PoolEvent::AddressChange")
191                    .field("peer", connection.peer_id())
192                    .field("new_endpoint", new_endpoint)
193                    .field("old_endpoint", old_endpoint)
194                    .finish()
195            },
196        }
197    }
198}
199
200impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
201    Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
202{
203    /// Creates a new empty `Pool`.
204    pub fn new(
205        local_id: PeerId,
206        manager_config: ManagerConfig,
207        limits: ConnectionLimits
208    ) -> Self {
209        Pool {
210            local_id,
211            counters: ConnectionCounters::new(limits),
212            manager: Manager::new(manager_config),
213            established: Default::default(),
214            pending: Default::default(),
215            disconnected: Vec::new(),
216        }
217    }
218
219    /// Gets the dedicated connection counters.
220    pub fn counters(&self) -> &ConnectionCounters {
221        &self.counters
222    }
223
224    /// Adds a pending incoming connection to the pool in the form of a
225    /// `Future` that establishes and negotiates the connection.
226    ///
227    /// Returns an error if the limit of pending incoming connections
228    /// has been reached.
229    pub fn add_incoming<TFut, TMuxer>(
230        &mut self,
231        future: TFut,
232        handler: THandler,
233        info: IncomingInfo<'_>,
234    ) -> Result<ConnectionId, ConnectionLimit>
235    where
236        TFut: Future<
237            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
238        > + Send + 'static,
239        THandler: IntoConnectionHandler + Send + 'static,
240        THandler::Handler: ConnectionHandler<
241            Substream = Substream<TMuxer>,
242            InEvent = TInEvent,
243            OutEvent = TOutEvent,
244            Error = THandlerErr
245        > + Send + 'static,
246        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
247        TTransErr: error::Error + Send + 'static,
248        THandlerErr: error::Error + Send + 'static,
249        TInEvent: Send + 'static,
250        TOutEvent: Send + 'static,
251        TMuxer: StreamMuxer + Send + Sync + 'static,
252        TMuxer::OutboundSubstream: Send + 'static,
253    {
254        self.counters.check_max_pending_incoming()?;
255        let endpoint = info.to_connected_point();
256        Ok(self.add_pending(future, handler, endpoint, None))
257    }
258
259    /// Adds a pending outgoing connection to the pool in the form of a `Future`
260    /// that establishes and negotiates the connection.
261    ///
262    /// Returns an error if the limit of pending outgoing connections
263    /// has been reached.
264    pub fn add_outgoing<TFut, TMuxer>(
265        &mut self,
266        future: TFut,
267        handler: THandler,
268        info: OutgoingInfo<'_>,
269    ) -> Result<ConnectionId, ConnectionLimit>
270    where
271        TFut: Future<
272            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
273        > + Send + 'static,
274        THandler: IntoConnectionHandler + Send + 'static,
275        THandler::Handler: ConnectionHandler<
276            Substream = Substream<TMuxer>,
277            InEvent = TInEvent,
278            OutEvent = TOutEvent,
279            Error = THandlerErr
280        > + Send + 'static,
281        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
282        TTransErr: error::Error + Send + 'static,
283        THandlerErr: error::Error + Send + 'static,
284        TInEvent: Send + 'static,
285        TOutEvent: Send + 'static,
286        TMuxer: StreamMuxer + Send + Sync + 'static,
287        TMuxer::OutboundSubstream: Send + 'static,
288    {
289        self.counters.check_max_pending_outgoing()?;
290        let endpoint = info.to_connected_point();
291        Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
292    }
293
294    /// Adds a pending connection to the pool in the form of a
295    /// `Future` that establishes and negotiates the connection.
296    fn add_pending<TFut, TMuxer>(
297        &mut self,
298        future: TFut,
299        handler: THandler,
300        endpoint: ConnectedPoint,
301        peer: Option<PeerId>,
302    ) -> ConnectionId
303    where
304        TFut: Future<
305            Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
306        > + Send + 'static,
307        THandler: IntoConnectionHandler + Send + 'static,
308        THandler::Handler: ConnectionHandler<
309            Substream = Substream<TMuxer>,
310            InEvent = TInEvent,
311            OutEvent = TOutEvent,
312            Error = THandlerErr
313        > + Send + 'static,
314        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
315        TTransErr: error::Error + Send + 'static,
316        THandlerErr: error::Error + Send + 'static,
317        TInEvent: Send + 'static,
318        TOutEvent: Send + 'static,
319        TMuxer: StreamMuxer + Send + Sync + 'static,
320        TMuxer::OutboundSubstream: Send + 'static,
321    {
322        // Validate the received peer ID as the last step of the pending connection
323        // future, so that these errors can be raised before the `handler` is consumed
324        // by the background task, which happens when this future resolves to an
325        // "established" connection.
326        let future = future.and_then({
327            let endpoint = endpoint.clone();
328            let expected_peer = peer.clone();
329            let local_id = self.local_id.clone();
330            move |(peer_id, muxer)| {
331                if let Some(peer) = expected_peer {
332                    if peer != peer_id {
333                        return future::err(PendingConnectionError::InvalidPeerId)
334                    }
335                }
336
337                if local_id == peer_id {
338                    return future::err(PendingConnectionError::InvalidPeerId)
339                }
340
341                let connected = Connected { peer_id, endpoint };
342                future::ready(Ok((connected, muxer)))
343            }
344        });
345
346        let id = self.manager.add_pending(future, handler);
347        self.counters.inc_pending(&endpoint);
348        self.pending.insert(id, (endpoint, peer));
349        id
350    }
351
352    /// Adds an existing established connection to the pool.
353    ///
354    /// Returns the assigned connection ID on success. An error is returned
355    /// if the configured maximum number of established connections for the
356    /// connected peer has been reached.
357    pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected)
358        -> Result<ConnectionId, ConnectionLimit>
359    where
360        THandler: IntoConnectionHandler + Send + 'static,
361        THandler::Handler: ConnectionHandler<
362            Substream = connection::Substream<TMuxer>,
363            InEvent = TInEvent,
364            OutEvent = TOutEvent,
365            Error = THandlerErr
366        > + Send + 'static,
367        <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
368        TTransErr: error::Error + Send + 'static,
369        THandlerErr: error::Error + Send + 'static,
370        TInEvent: Send + 'static,
371        TOutEvent: Send + 'static,
372        TMuxer: StreamMuxer + Send + Sync + 'static,
373        TMuxer::OutboundSubstream: Send + 'static,
374    {
375        self.counters.check_max_established(&i.endpoint)?;
376        self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
377        let id = self.manager.add(c, i.clone());
378        self.counters.inc_established(&i.endpoint);
379        self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
380        Ok(id)
381    }
382
383    /// Gets an entry representing a connection in the pool.
384    ///
385    /// Returns `None` if the pool has no connection with the given ID.
386    pub fn get(&mut self, id: ConnectionId)
387        -> Option<PoolConnection<'_, TInEvent>>
388    {
389        match self.manager.entry(id) {
390            Some(manager::Entry::Established(entry)) =>
391                Some(PoolConnection::Established(EstablishedConnection {
392                    entry
393                })),
394            Some(manager::Entry::Pending(entry)) =>
395                Some(PoolConnection::Pending(PendingConnection {
396                    entry,
397                    pending: &mut self.pending,
398                    counters: &mut self.counters,
399                })),
400            None => None
401        }
402    }
403
404    /// Gets an established connection from the pool by ID.
405    pub fn get_established(&mut self, id: ConnectionId)
406        -> Option<EstablishedConnection<'_, TInEvent>>
407    {
408        match self.get(id) {
409            Some(PoolConnection::Established(c)) => Some(c),
410            _ => None
411        }
412    }
413
414    /// Gets a pending outgoing connection by ID.
415    pub fn get_outgoing(&mut self, id: ConnectionId)
416        -> Option<PendingConnection<'_, TInEvent>>
417    {
418        match self.pending.get(&id) {
419            Some((ConnectedPoint::Dialer { .. }, _peer)) =>
420                match self.manager.entry(id) {
421                    Some(manager::Entry::Pending(entry)) =>
422                        Some(PendingConnection {
423                            entry,
424                            pending: &mut self.pending,
425                            counters: &mut self.counters,
426                        }),
427                    _ => unreachable!("by consistency of `self.pending` with `self.manager`")
428                }
429            _ => None
430        }
431    }
432
433    /// Returns true if we are connected to the given peer.
434    ///
435    /// This will return true only after a `NodeReached` event has been produced by `poll()`.
436    pub fn is_connected(&self, id: &PeerId) -> bool {
437        self.established.contains_key(id)
438    }
439
440    /// Returns the number of connected peers, i.e. those with at least one
441    /// established connection in the pool.
442    pub fn num_peers(&self) -> usize {
443        self.established.len()
444    }
445
446    /// (Forcefully) close all connections to the given peer.
447    ///
448    /// All connections to the peer, whether pending or established are
449    /// dropped asap and no more events from these connections are emitted
450    /// by the pool effective immediately.
451    ///
452    /// > **Note**: Established connections are dropped without performing
453    /// > an orderly close. See [`EstablishedConnection::start_close`] for
454    /// > performing such an orderly close.
455    pub fn disconnect(&mut self, peer: &PeerId) {
456        if let Some(conns) = self.established.get(peer) {
457            // Count upwards because we push to / pop from the end. See also `Pool::poll`.
458            let mut num_established = 0;
459            for (&id, endpoint) in conns.iter() {
460                if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
461                    let connected = e.remove();
462                    self.disconnected.push(Disconnected {
463                        id, connected, num_established
464                    });
465                    num_established += 1;
466                }
467                self.counters.dec_established(endpoint);
468            }
469        }
470        self.established.remove(peer);
471
472        let mut aborted = Vec::new();
473        for (&id, (_endpoint, peer2)) in &self.pending {
474            if Some(peer) == peer2.as_ref() {
475                if let Some(manager::Entry::Pending(e)) = self.manager.entry(id) {
476                    e.abort();
477                    aborted.push(id);
478                }
479            }
480        }
481        for id in aborted {
482            if let Some((endpoint, _)) = self.pending.remove(&id) {
483                self.counters.dec_pending(&endpoint);
484            }
485        }
486    }
487
488    /// Counts the number of established connections to the given peer.
489    pub fn num_peer_established(&self, peer: &PeerId) -> u32 {
490        num_peer_established(&self.established, peer)
491    }
492
493    /// Returns an iterator over all established connections of `peer`.
494    pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
495        -> EstablishedConnectionIter<'a,
496            impl Iterator<Item = ConnectionId>,
497            TInEvent,
498            TOutEvent,
499            THandler,
500            TTransErr,
501            THandlerErr>
502    {
503        let ids = self.iter_peer_established_info(peer)
504            .map(|(id, _endpoint)| *id)
505            .collect::<SmallVec<[ConnectionId; 10]>>()
506            .into_iter();
507
508        EstablishedConnectionIter { pool: self, ids }
509    }
510
511    /// Returns an iterator for information on all pending incoming connections.
512    pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
513        self.iter_pending_info()
514            .filter_map(|(_, ref endpoint, _)| {
515                match endpoint {
516                    ConnectedPoint::Listener { local_addr, send_back_addr } => {
517                        Some(IncomingInfo { local_addr, send_back_addr })
518                    },
519                    ConnectedPoint::Dialer { .. } => None,
520                }
521            })
522    }
523
524    /// Returns an iterator for information on all pending outgoing connections.
525    pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_>> {
526        self.iter_pending_info()
527            .filter_map(|(_, ref endpoint, ref peer_id)| {
528                match endpoint {
529                    ConnectedPoint::Listener { .. } => None,
530                    ConnectedPoint::Dialer { address } =>
531                        Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
532                }
533            })
534    }
535
536    /// Returns an iterator over all connection IDs and associated endpoints
537    /// of established connections to `peer` known to the pool.
538    pub fn iter_peer_established_info(&self, peer: &PeerId)
539        -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
540    {
541        match self.established.get(peer) {
542            Some(conns) => Either::Left(conns.iter()),
543            None => Either::Right(std::iter::empty())
544        }
545    }
546
547    /// Returns an iterator over all pending connection IDs together
548    /// with associated endpoints and expected peer IDs in the pool.
549    pub fn iter_pending_info(&self)
550        -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<PeerId>)> + '_
551    {
552        self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
553    }
554
555    /// Returns an iterator over all connected peers, i.e. those that have
556    /// at least one established connection in the pool.
557    pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
558        self.established.keys()
559    }
560
561    /// Polls the connection pool for events.
562    ///
563    /// > **Note**: We use a regular `poll` method instead of implementing `Stream`,
564    /// > because we want the `Pool` to stay borrowed if necessary.
565    pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
566        PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
567    >     {
568        // Drain events resulting from forced disconnections.
569        //
570        // Note: The `Disconnected` entries in `self.disconnected`
571        // are inserted in ascending order of the remaining `num_established`
572        // connections. Thus we `pop()` them off from the end to emit the
573        // events in an order that properly counts down `num_established`.
574        // See also `Pool::disconnect`.
575        if let Some(Disconnected {
576            id, connected, num_established
577        }) = self.disconnected.pop() {
578            return Poll::Ready(PoolEvent::ConnectionClosed {
579                id,
580                connected,
581                num_established,
582                error: None,
583                pool: self,
584            })
585        }
586
587        // Poll the connection `Manager`.
588        loop {
589            let item = match self.manager.poll(cx) {
590                Poll::Ready(item) => item,
591                Poll::Pending => return Poll::Pending,
592            };
593
594            match item {
595                manager::Event::PendingConnectionError { id, error, handler } => {
596                    if let Some((endpoint, peer)) = self.pending.remove(&id) {
597                        self.counters.dec_pending(&endpoint);
598                        return Poll::Ready(PoolEvent::PendingConnectionError {
599                            id,
600                            endpoint,
601                            error,
602                            handler: Some(handler),
603                            peer,
604                            pool: self
605                        })
606                    }
607                },
608                manager::Event::ConnectionClosed { id, connected, error } => {
609                    let num_established =
610                        if let Some(conns) = self.established.get_mut(&connected.peer_id) {
611                            if let Some(endpoint) = conns.remove(&id) {
612                                self.counters.dec_established(&endpoint);
613                            }
614                            u32::try_from(conns.len()).unwrap()
615                        } else {
616                            0
617                        };
618                    if num_established == 0 {
619                        self.established.remove(&connected.peer_id);
620                    }
621                    return Poll::Ready(PoolEvent::ConnectionClosed {
622                        id, connected, error, num_established, pool: self
623                    })
624                }
625                manager::Event::ConnectionEstablished { entry } => {
626                    let id = entry.id();
627                    if let Some((endpoint, peer)) = self.pending.remove(&id) {
628                        self.counters.dec_pending(&endpoint);
629
630                        // Check general established connection limit.
631                        if let Err(e) = self.counters.check_max_established(&endpoint) {
632                            let connected = entry.remove();
633                            return Poll::Ready(PoolEvent::PendingConnectionError {
634                                id,
635                                endpoint: connected.endpoint,
636                                error: PendingConnectionError::ConnectionLimit(e),
637                                handler: None,
638                                peer,
639                                pool: self
640                            })
641                        }
642
643                        // Check per-peer established connection limit.
644                        let current = num_peer_established(&self.established, &entry.connected().peer_id);
645                        if let Err(e) = self.counters.check_max_established_per_peer(current) {
646                            let connected = entry.remove();
647                            return Poll::Ready(PoolEvent::PendingConnectionError {
648                                id,
649                                endpoint: connected.endpoint,
650                                error: PendingConnectionError::ConnectionLimit(e),
651                                handler: None,
652                                peer,
653                                pool: self
654                            })
655                        }
656
657                        // Peer ID checks must already have happened. See `add_pending`.
658                        if cfg!(debug_assertions) {
659                            if self.local_id == entry.connected().peer_id {
660                                panic!("Unexpected local peer ID for remote.");
661                            }
662                            if let Some(peer) = peer {
663                                if peer != entry.connected().peer_id {
664                                    panic!("Unexpected peer ID mismatch.");
665                                }
666                            }
667                        }
668
669                        // Add the connection to the pool.
670                        let peer = entry.connected().peer_id.clone();
671                        let conns = self.established.entry(peer).or_default();
672                        let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
673                            .expect("n + 1 is always non-zero; qed");
674                        self.counters.inc_established(&endpoint);
675                        conns.insert(id, endpoint);
676                        match self.get(id) {
677                            Some(PoolConnection::Established(connection)) =>
678                                return Poll::Ready(PoolEvent::ConnectionEstablished {
679                                    connection, num_established
680                                }),
681                            _ => unreachable!("since `entry` is an `EstablishedEntry`.")
682                        }
683                    }
684                },
685                manager::Event::ConnectionEvent { entry, event } => {
686                    let id = entry.id();
687                    match self.get(id) {
688                        Some(PoolConnection::Established(connection)) =>
689                            return Poll::Ready(PoolEvent::ConnectionEvent {
690                                connection,
691                                event,
692                            }),
693                        _ => unreachable!("since `entry` is an `EstablishedEntry`.")
694                    }
695                },
696                manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
697                    let id = entry.id();
698
699                    match self.established.get_mut(&entry.connected().peer_id) {
700                        Some(list) => *list.get_mut(&id)
701                            .expect("state inconsistency: entry is `EstablishedEntry` but absent \
702                                from `established`") = new_endpoint.clone(),
703                        None => unreachable!("since `entry` is an `EstablishedEntry`.")
704                    };
705
706                    match self.get(id) {
707                        Some(PoolConnection::Established(connection)) =>
708                            return Poll::Ready(PoolEvent::AddressChange {
709                                connection,
710                                new_endpoint,
711                                old_endpoint,
712                            }),
713                        _ => unreachable!("since `entry` is an `EstablishedEntry`.")
714                    }
715                },
716            }
717        }
718    }
719
720}
721
722/// A connection in a [`Pool`].
723pub enum PoolConnection<'a, TInEvent> {
724    Pending(PendingConnection<'a, TInEvent>),
725    Established(EstablishedConnection<'a, TInEvent>),
726}
727
728/// A pending connection in a pool.
729pub struct PendingConnection<'a, TInEvent> {
730    entry: manager::PendingEntry<'a, TInEvent>,
731    pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
732    counters: &'a mut ConnectionCounters,
733}
734
735impl<TInEvent>
736    PendingConnection<'_, TInEvent>
737{
738    /// Returns the local connection ID.
739    pub fn id(&self) -> ConnectionId {
740        self.entry.id()
741    }
742
743    /// Returns the (expected) identity of the remote peer, if known.
744    pub fn peer_id(&self) -> &Option<PeerId> {
745        &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
746    }
747
748    /// Returns information about this endpoint of the connection.
749    pub fn endpoint(&self) -> &ConnectedPoint {
750        &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
751    }
752
753    /// Aborts the connection attempt, closing the connection.
754    pub fn abort(self) {
755        let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0;
756        self.counters.dec_pending(&endpoint);
757        self.entry.abort();
758    }
759}
760
761/// An established connection in a pool.
762pub struct EstablishedConnection<'a, TInEvent> {
763    entry: manager::EstablishedEntry<'a, TInEvent>,
764}
765
766impl<TInEvent> fmt::Debug
767for EstablishedConnection<'_, TInEvent>
768where
769    TInEvent: fmt::Debug,
770{
771    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
772        f.debug_struct("EstablishedConnection")
773            .field("entry", &self.entry)
774            .finish()
775    }
776}
777
778impl<TInEvent> EstablishedConnection<'_, TInEvent> {
779    pub fn connected(&self) -> &Connected {
780        self.entry.connected()
781    }
782
783    /// Returns information about the connected endpoint.
784    pub fn endpoint(&self) -> &ConnectedPoint {
785        &self.entry.connected().endpoint
786    }
787
788    /// Returns the identity of the connected peer.
789    pub fn peer_id(&self) -> &PeerId {
790        &self.entry.connected().peer_id
791    }
792
793    /// Returns the local connection ID.
794    pub fn id(&self) -> ConnectionId {
795        self.entry.id()
796    }
797
798    /// (Asynchronously) sends an event to the connection handler.
799    ///
800    /// If the handler is not ready to receive the event, either because
801    /// it is busy or the connection is about to close, the given event
802    /// is returned with an `Err`.
803    ///
804    /// If execution of this method is preceded by successful execution of
805    /// `poll_ready_notify_handler` without another intervening execution
806    /// of `notify_handler`, it only fails if the connection is now about
807    /// to close.
808    pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
809        self.entry.notify_handler(event)
810    }
811
812    /// Checks if `notify_handler` is ready to accept an event.
813    ///
814    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
815    ///
816    /// Returns `Err(())` if the background task associated with the connection
817    /// is terminating and the connection is about to close.
818    pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
819        self.entry.poll_ready_notify_handler(cx)
820    }
821
822    /// Initiates a graceful close of the connection.
823    ///
824    /// Has no effect if the connection is already closing.
825    pub fn start_close(self) {
826        self.entry.start_close()
827    }
828}
829
830/// An iterator over established connections in a pool.
831pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
832    pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
833    ids: I
834}
835
836// Note: Ideally this would be an implementation of `Iterator`, but that
837// requires GATs (cf. https://github.com/rust-lang/rust/issues/44265) and
838// a different definition of `Iterator`.
839impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
840    EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
841where
842    I: Iterator<Item = ConnectionId>
843{
844    /// Obtains the next connection, if any.
845    pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
846    {
847        while let Some(id) = self.ids.next() {
848            if self.pool.manager.is_established(&id) { // (*)
849                match self.pool.manager.entry(id) {
850                    Some(manager::Entry::Established(entry)) => {
851                        return Some(EstablishedConnection { entry })
852                    }
853                    _ => panic!("Established entry not found in manager.") // see (*)
854                }
855            }
856        }
857        None
858    }
859
860    /// Turns the iterator into an iterator over just the connection IDs.
861    pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
862        self.ids
863    }
864
865    /// Returns the first connection, if any, consuming the iterator.
866    pub fn into_first<'b>(mut self)
867        -> Option<EstablishedConnection<'b, TInEvent>>
868    where 'a: 'b
869    {
870        while let Some(id) = self.ids.next() {
871            if self.pool.manager.is_established(&id) { // (*)
872                match self.pool.manager.entry(id) {
873                    Some(manager::Entry::Established(entry)) => {
874                        return Some(EstablishedConnection { entry })
875                    }
876                    _ => panic!("Established entry not found in manager.") // see (*)
877                }
878            }
879        }
880        None
881    }
882}
883
884/// Network connection information.
885#[derive(Debug, Clone)]
886pub struct ConnectionCounters {
887    /// The effective connection limits.
888    limits: ConnectionLimits,
889    /// The current number of incoming connections.
890    pending_incoming: u32,
891    /// The current number of outgoing connections.
892    pending_outgoing: u32,
893    /// The current number of established inbound connections.
894    established_incoming: u32,
895    /// The current number of established outbound connections.
896    established_outgoing: u32,
897}
898
899impl ConnectionCounters {
900    fn new(limits: ConnectionLimits) -> Self {
901        Self {
902            limits,
903            pending_incoming: 0,
904            pending_outgoing: 0,
905            established_incoming: 0,
906            established_outgoing: 0,
907        }
908    }
909
910    /// The effective connection limits.
911    pub fn limits(&self) -> &ConnectionLimits {
912        &self.limits
913    }
914
915    /// The total number of connections, both pending and established.
916    pub fn num_connections(&self) -> u32 {
917        self.num_pending() + self.num_established()
918    }
919
920    /// The total number of pending connections, both incoming and outgoing.
921    pub fn num_pending(&self) -> u32 {
922        self.pending_incoming + self.pending_outgoing
923    }
924
925    /// The number of incoming connections being established.
926    pub fn num_pending_incoming(&self) -> u32 {
927        self.pending_incoming
928    }
929
930    /// The number of outgoing connections being established.
931    pub fn num_pending_outgoing(&self) -> u32 {
932        self.pending_outgoing
933    }
934
935    /// The number of established incoming connections.
936    pub fn num_established_incoming(&self) -> u32 {
937        self.established_incoming
938    }
939
940    /// The number of established outgoing connections.
941    pub fn num_established_outgoing(&self) -> u32 {
942        self.established_outgoing
943    }
944
945    /// The total number of established connections.
946    pub fn num_established(&self) -> u32 {
947        self.established_outgoing + self.established_incoming
948    }
949
950    fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
951        match endpoint {
952            ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
953            ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
954        }
955    }
956
957    fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
958        match endpoint {
959            ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
960            ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
961        }
962    }
963
964    fn inc_established(&mut self, endpoint: &ConnectedPoint) {
965        match endpoint {
966            ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; }
967            ConnectedPoint::Listener { .. } => { self.established_incoming += 1; }
968        }
969    }
970
971    fn dec_established(&mut self, endpoint: &ConnectedPoint) {
972        match endpoint {
973            ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; }
974            ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; }
975        }
976    }
977
978    fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
979        Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
980    }
981
982    fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
983        Self::check(self.pending_incoming, self.limits.max_pending_incoming)
984    }
985
986    fn check_max_established(&self, endpoint: &ConnectedPoint)
987        -> Result<(), ConnectionLimit>
988    {
989        match endpoint {
990            ConnectedPoint::Dialer { .. } =>
991                Self::check(self.established_outgoing, self.limits.max_established_outgoing),
992            ConnectedPoint::Listener { .. } => {
993                Self::check(self.established_incoming, self.limits.max_established_incoming)
994            }
995        }
996    }
997
998    fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
999        Self::check(current, self.limits.max_established_per_peer)
1000    }
1001
1002    fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
1003        if let Some(limit) = limit {
1004            if current >= limit {
1005                return Err(ConnectionLimit { limit, current })
1006            }
1007        }
1008        Ok(())
1009    }
1010
1011}
1012
1013/// Counts the number of established connections to the given peer.
1014fn num_peer_established(
1015    established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
1016    peer: &PeerId
1017) -> u32 {
1018    established.get(peer).map_or(0, |conns|
1019        u32::try_from(conns.len())
1020            .expect("Unexpectedly large number of connections for a peer."))
1021}
1022
1023/// The configurable connection limits.
1024///
1025/// By default no connection limits apply.
1026#[derive(Debug, Clone, Default)]
1027pub struct ConnectionLimits {
1028    max_pending_incoming: Option<u32>,
1029    max_pending_outgoing: Option<u32>,
1030    max_established_incoming: Option<u32>,
1031    max_established_outgoing: Option<u32>,
1032    max_established_per_peer: Option<u32>,
1033}
1034
1035impl ConnectionLimits {
1036    /// Configures the maximum number of concurrently incoming connections being established.
1037    pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
1038        self.max_pending_incoming = limit;
1039        self
1040    }
1041
1042    /// Configures the maximum number of concurrently outgoing connections being established.
1043    pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
1044        self.max_pending_outgoing = limit;
1045        self
1046    }
1047
1048    /// Configures the maximum number of concurrent established inbound connections.
1049    pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
1050        self.max_established_incoming = limit;
1051        self
1052    }
1053
1054    /// Configures the maximum number of concurrent established outbound connections.
1055    pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
1056        self.max_established_outgoing = limit;
1057        self
1058    }
1059
1060    /// Configures the maximum number of concurrent established connections per peer,
1061    /// regardless of direction (incoming or outgoing).
1062    pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
1063        self.max_established_per_peer = limit;
1064        self
1065    }
1066}
1067
1068/// Information about a former established connection to a peer
1069/// that was dropped via [`Pool::disconnect`].
1070struct Disconnected {
1071    /// The unique identifier of the dropped connection.
1072    id: ConnectionId,
1073    /// Information about the dropped connection.
1074    connected: Connected,
1075    /// The remaining number of established connections
1076    /// to the same peer.
1077    num_established: u32,
1078}