libp2p_core/
network.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21mod event;
22pub mod peer;
23
24pub use crate::connection::{ConnectionLimits, ConnectionCounters};
25pub use event::{NetworkEvent, IncomingConnection};
26pub use peer::Peer;
27
28use crate::{
29    ConnectedPoint,
30    Executor,
31    Multiaddr,
32    PeerId,
33    connection::{
34        ConnectionId,
35        ConnectionLimit,
36        ConnectionHandler,
37        IntoConnectionHandler,
38        IncomingInfo,
39        OutgoingInfo,
40        ListenersEvent,
41        ListenerId,
42        ListenersStream,
43        PendingConnectionError,
44        Substream,
45        manager::ManagerConfig,
46        pool::{Pool, PoolEvent},
47    },
48    muxing::StreamMuxer,
49    transport::{Transport, TransportError},
50};
51use fnv::{FnvHashMap};
52use futures::{prelude::*, future};
53use smallvec::SmallVec;
54use std::{
55    collections::hash_map,
56    convert::TryFrom as _,
57    error,
58    fmt,
59    num::NonZeroUsize,
60    pin::Pin,
61    task::{Context, Poll},
62};
63
64/// Implementation of `Stream` that handles the nodes.
65pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
66where
67    TTrans: Transport,
68    THandler: IntoConnectionHandler,
69{
70    /// The local peer ID.
71    local_peer_id: PeerId,
72
73    /// Listeners for incoming connections.
74    listeners: ListenersStream<TTrans>,
75
76    /// The nodes currently active.
77    pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
78        <THandler::Handler as ConnectionHandler>::Error>,
79
80    /// The ongoing dialing attempts.
81    ///
82    /// There may be multiple ongoing dialing attempts to the same peer.
83    /// Each dialing attempt is associated with a new connection and hence
84    /// a new connection ID.
85    ///
86    /// > **Note**: `dialing` must be consistent with the pending outgoing
87    /// > connections in `pool`. That is, for every entry in `dialing`
88    /// > there must exist a pending outgoing connection in `pool` with
89    /// > the same connection ID. This is ensured by the implementation of
90    /// > `Network` (see `dial_peer_impl` and `on_connection_failed`)
91    /// > together with the implementation of `DialingAttempt::abort`.
92    dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
93}
94
95impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
96    Network<TTrans, TInEvent, TOutEvent, THandler>
97where
98    TTrans: fmt::Debug + Transport,
99    THandler: fmt::Debug + ConnectionHandler,
100{
101    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
102        f.debug_struct("ReachAttempts")
103            .field("local_peer_id", &self.local_peer_id)
104            .field("listeners", &self.listeners)
105            .field("peers", &self.pool)
106            .field("dialing", &self.dialing)
107            .finish()
108    }
109}
110
111impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
112    Network<TTrans, TInEvent, TOutEvent, THandler>
113where
114    TTrans: Transport,
115    THandler: IntoConnectionHandler,
116{
117}
118
119impl<TTrans, TInEvent, TOutEvent, THandler>
120    Network<TTrans, TInEvent, TOutEvent, THandler>
121where
122    TTrans: Transport,
123    THandler: IntoConnectionHandler,
124{
125    fn disconnect(&mut self, peer: &PeerId) {
126        self.pool.disconnect(peer);
127        self.dialing.remove(peer);
128    }
129}
130
131impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
132    Network<TTrans, TInEvent, TOutEvent, THandler>
133where
134    TTrans: Transport + Clone,
135    TMuxer: StreamMuxer,
136    THandler: IntoConnectionHandler + Send + 'static,
137    THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
138    <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
139    <THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
140{
141    /// Creates a new node events stream.
142    pub fn new(
143        transport: TTrans,
144        local_peer_id: PeerId,
145        config: NetworkConfig,
146    ) -> Self {
147        Network {
148            local_peer_id,
149            listeners: ListenersStream::new(transport),
150            pool: Pool::new(local_peer_id, config.manager_config, config.limits),
151            dialing: Default::default(),
152        }
153    }
154
155    /// Returns the transport passed when building this object.
156    pub fn transport(&self) -> &TTrans {
157        self.listeners.transport()
158    }
159
160    /// Start listening on the given multiaddress.
161    pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>> {
162        self.listeners.listen_on(addr)
163    }
164
165    /// Remove a previously added listener.
166    ///
167    /// Returns `Ok(())` if a listener with this ID was in the list.
168    pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
169        self.listeners.remove_listener(id)
170    }
171
172    /// Returns an iterator that produces the list of addresses we are listening on.
173    pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
174        self.listeners.listen_addrs()
175    }
176
177    /// Maps the given `observed_addr`, representing an address of the local
178    /// node observed by a remote peer, onto the locally known listen addresses
179    /// to yield one or more addresses of the local node that may be publicly
180    /// reachable.
181    ///
182    /// I.e. this method incorporates the view of other peers into the listen
183    /// addresses seen by the local node to account for possible IP and port
184    /// mappings performed by intermediate network devices in an effort to
185    /// obtain addresses for the local peer that are also reachable for peers
186    /// other than the peer who reported the `observed_addr`.
187    ///
188    /// The translation is transport-specific. See [`Transport::address_translation`].
189    pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
190        -> impl Iterator<Item = Multiaddr> + 'a
191    where
192        TMuxer: 'a,
193        THandler: 'a,
194    {
195        let transport = self.listeners.transport();
196        let mut addrs: Vec<_> = self.listen_addrs()
197            .filter_map(move |server| transport.address_translation(server, observed_addr))
198            .collect();
199
200        // remove duplicates
201        addrs.sort_unstable();
202        addrs.dedup();
203
204        addrs.into_iter()
205    }
206
207    /// Returns the peer id of the local node.
208    pub fn local_peer_id(&self) -> &PeerId {
209        &self.local_peer_id
210    }
211
212    /// Dials a multiaddress without expecting a particular remote peer ID.
213    ///
214    /// The given `handler` will be used to create the
215    /// [`Connection`](crate::connection::Connection) upon success and the
216    /// connection ID is returned.
217    pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
218        -> Result<ConnectionId, ConnectionLimit>
219    where
220        TTrans: Transport<Output = (PeerId, TMuxer)>,
221        TTrans::Error: Send + 'static,
222        TTrans::Dial: Send + 'static,
223        TMuxer: Send + Sync + 'static,
224        TMuxer::OutboundSubstream: Send,
225        TInEvent: Send + 'static,
226        TOutEvent: Send + 'static,
227    {
228        let info = OutgoingInfo { address, peer_id: None };
229        match self.transport().clone().dial(address.clone()) {
230            Ok(f) => {
231                let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
232                self.pool.add_outgoing(f, handler, info)
233            }
234            Err(err) => {
235                let f = future::err(PendingConnectionError::Transport(err));
236                self.pool.add_outgoing(f, handler, info)
237            }
238        }
239    }
240
241    /// Returns information about the state of the `Network`.
242    pub fn info(&self) -> NetworkInfo {
243        let num_peers = self.pool.num_peers();
244        let connection_counters = self.pool.counters().clone();
245        NetworkInfo {
246            num_peers,
247            connection_counters,
248        }
249    }
250
251    /// Returns an iterator for information on all pending incoming connections.
252    pub fn incoming_info(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
253        self.pool.iter_pending_incoming()
254    }
255
256    /// Returns the list of addresses we're currently dialing without knowing the `PeerId` of.
257    pub fn unknown_dials(&self) -> impl Iterator<Item = &Multiaddr> {
258        self.pool.iter_pending_outgoing()
259            .filter_map(|info| {
260                if info.peer_id.is_none() {
261                    Some(info.address)
262                } else {
263                    None
264                }
265            })
266    }
267
268    /// Returns a list of all connected peers, i.e. peers to whom the `Network`
269    /// has at least one established connection.
270    pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
271        self.pool.iter_connected()
272    }
273
274    /// Checks whether the network has an established connection to a peer.
275    pub fn is_connected(&self, peer: &PeerId) -> bool {
276        self.pool.is_connected(peer)
277    }
278
279    /// Checks whether the network has an ongoing dialing attempt to a peer.
280    pub fn is_dialing(&self, peer: &PeerId) -> bool {
281        self.dialing.contains_key(peer)
282    }
283
284    /// Checks whether the network has neither an ongoing dialing attempt,
285    /// nor an established connection to a peer.
286    pub fn is_disconnected(&self, peer: &PeerId) -> bool {
287        !self.is_connected(peer) && !self.is_dialing(peer)
288    }
289
290    /// Returns a list of all the peers to whom a new outgoing connection
291    /// is currently being established.
292    pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
293        self.dialing.keys()
294    }
295
296    /// Obtains a view of a [`Peer`] with the given ID in the network.
297    pub fn peer(&mut self, peer_id: PeerId)
298        -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
299    {
300        Peer::new(self, peer_id)
301    }
302
303    /// Accepts a pending incoming connection obtained via [`NetworkEvent::IncomingConnection`],
304    /// adding it to the `Network`s connection pool subject to the configured limits.
305    ///
306    /// Once the connection is established and all transport protocol upgrades
307    /// completed, the connection is associated with the provided `handler`.
308    pub fn accept(
309        &mut self,
310        connection: IncomingConnection<TTrans::ListenerUpgrade>,
311        handler: THandler,
312    ) -> Result<ConnectionId, ConnectionLimit>
313    where
314        TInEvent: Send + 'static,
315        TOutEvent: Send + 'static,
316        TMuxer: StreamMuxer + Send + Sync + 'static,
317        TMuxer::OutboundSubstream: Send,
318        TTrans: Transport<Output = (PeerId, TMuxer)>,
319        TTrans::Error: Send + 'static,
320        TTrans::ListenerUpgrade: Send + 'static,
321    {
322        let upgrade = connection.upgrade.map_err(|err|
323            PendingConnectionError::Transport(TransportError::Other(err)));
324        let info = IncomingInfo {
325            local_addr: &connection.local_addr,
326            send_back_addr: &connection.send_back_addr,
327        };
328        self.pool.add_incoming(upgrade, handler, info)
329    }
330
331    /// Provides an API similar to `Stream`, except that it cannot error.
332    pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
333    where
334        TTrans: Transport<Output = (PeerId, TMuxer)>,
335        TTrans::Error: Send + 'static,
336        TTrans::Dial: Send + 'static,
337        TTrans::ListenerUpgrade: Send + 'static,
338        TMuxer: Send + Sync + 'static,
339        TMuxer::OutboundSubstream: Send,
340        TInEvent: Send + 'static,
341        TOutEvent: Send + 'static,
342        THandler: IntoConnectionHandler + Send + 'static,
343        THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
344        <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
345    {
346        // Poll the listener(s) for new connections.
347        match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
348            Poll::Pending => (),
349            Poll::Ready(ListenersEvent::Incoming {
350                listener_id,
351                upgrade,
352                local_addr,
353                send_back_addr
354            }) => {
355                return Poll::Ready(NetworkEvent::IncomingConnection {
356                    listener_id,
357                    connection: IncomingConnection {
358                        upgrade,
359                        local_addr,
360                        send_back_addr,
361                    }
362                })
363            }
364            Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
365                return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })
366            }
367            Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
368                return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
369            }
370            Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => {
371                return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason })
372            }
373            Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
374                return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
375            }
376        }
377
378        // Poll the known peers.
379        let event = match self.pool.poll(cx) {
380            Poll::Pending => return Poll::Pending,
381            Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
382                if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id()) {
383                    e.get_mut().retain(|s| s.current.0 != connection.id());
384                    if e.get().is_empty() {
385                        e.remove();
386                    }
387                }
388
389                NetworkEvent::ConnectionEstablished {
390                    connection,
391                    num_established,
392                }
393            }
394            Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
395                let dialing = &mut self.dialing;
396                let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
397                if let Some(dial) = next {
398                    let transport = self.listeners.transport().clone();
399                    if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
400                        log::warn!("Dialing aborted: {:?}", e);
401                    }
402                }
403                event
404            }
405            Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, .. }) => {
406                NetworkEvent::ConnectionClosed {
407                    id,
408                    connected,
409                    num_established,
410                    error,
411                }
412            }
413            Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
414                NetworkEvent::ConnectionEvent {
415                    connection,
416                    event,
417                }
418            }
419            Poll::Ready(PoolEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
420                NetworkEvent::AddressChange {
421                    connection,
422                    new_endpoint,
423                    old_endpoint,
424                }
425            }
426        };
427
428        Poll::Ready(event)
429    }
430
431    /// Initiates a connection attempt to a known peer.
432    fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
433        -> Result<ConnectionId, ConnectionLimit>
434    where
435        TTrans: Transport<Output = (PeerId, TMuxer)>,
436        TTrans::Dial: Send + 'static,
437        TTrans::Error: Send + 'static,
438        TMuxer: Send + Sync + 'static,
439        TMuxer::OutboundSubstream: Send,
440        TInEvent: Send + 'static,
441        TOutEvent: Send + 'static,
442    {
443        dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
444    }
445}
446
447/// Options for a dialing attempt (i.e. repeated connection attempt
448/// via a list of address) to a peer.
449struct DialingOpts<PeerId, THandler> {
450    peer: PeerId,
451    handler: THandler,
452    address: Multiaddr,
453    remaining: Vec<Multiaddr>,
454}
455
456/// Standalone implementation of `Network::dial_peer` for more granular borrowing.
457fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
458    transport: TTrans,
459    pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
460        <THandler::Handler as ConnectionHandler>::Error>,
461    dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
462    opts: DialingOpts<PeerId, THandler>
463) -> Result<ConnectionId, ConnectionLimit>
464where
465    THandler: IntoConnectionHandler + Send + 'static,
466    <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
467    <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
468    THandler::Handler: ConnectionHandler<
469        Substream = Substream<TMuxer>,
470        InEvent = TInEvent,
471        OutEvent = TOutEvent,
472    > + Send + 'static,
473    TTrans: Transport<Output = (PeerId, TMuxer)>,
474    TTrans::Dial: Send + 'static,
475    TTrans::Error: error::Error + Send + 'static,
476    TMuxer: StreamMuxer + Send + Sync + 'static,
477    TMuxer::OutboundSubstream: Send + 'static,
478    TInEvent: Send + 'static,
479    TOutEvent: Send + 'static,
480{
481    let result = match transport.dial(opts.address.clone()) {
482        Ok(fut) => {
483            let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
484            let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
485            pool.add_outgoing(fut, opts.handler, info)
486        },
487        Err(err) => {
488            let fut = future::err(PendingConnectionError::Transport(err));
489            let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
490            pool.add_outgoing(fut, opts.handler, info)
491        },
492    };
493
494    if let Ok(id) = &result {
495        dialing.entry(opts.peer).or_default().push(
496            peer::DialingState {
497                current: (*id, opts.address),
498                remaining: opts.remaining,
499            },
500        );
501    }
502
503    result
504}
505
506/// Callback for handling a failed connection attempt, returning an
507/// event to emit from the `Network`.
508///
509/// If the failed connection attempt was a dialing attempt and there
510/// are more addresses to try, new `DialingOpts` are returned.
511fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
512    dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
513    id: ConnectionId,
514    endpoint: ConnectedPoint,
515    error: PendingConnectionError<TTrans::Error>,
516    handler: Option<THandler>,
517) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
518where
519    TTrans: Transport,
520    THandler: IntoConnectionHandler,
521{
522    // Check if the failed connection is associated with a dialing attempt.
523    let dialing_failed = dialing.iter_mut()
524        .find_map(|(peer, attempts)| {
525            if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
526                let attempt = attempts.remove(pos);
527                let last = attempts.is_empty();
528                Some((*peer, attempt, last))
529            } else {
530                None
531            }
532        });
533
534    if let Some((peer_id, mut attempt, last)) = dialing_failed {
535        if last {
536            dialing.remove(&peer_id);
537        }
538
539        let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
540        let failed_addr = attempt.current.1.clone();
541
542        let (opts, attempts_remaining) =
543            if num_remain > 0 {
544                if let Some(handler) = handler {
545                    let next_attempt = attempt.remaining.remove(0);
546                    let opts = DialingOpts {
547                        peer: peer_id,
548                        handler,
549                        address: next_attempt,
550                        remaining: attempt.remaining
551                    };
552                    (Some(opts), num_remain)
553                } else {
554                    // The error is "fatal" for the dialing attempt, since
555                    // the handler was already consumed. All potential
556                    // remaining connection attempts are thus void.
557                    (None, 0)
558                }
559            } else {
560                (None, 0)
561            };
562
563        (opts, NetworkEvent::DialError {
564            attempts_remaining,
565            peer_id,
566            multiaddr: failed_addr,
567            error,
568        })
569    } else {
570        // A pending incoming connection or outgoing connection to an unknown peer failed.
571        match endpoint {
572            ConnectedPoint::Dialer { address } =>
573                (None, NetworkEvent::UnknownPeerDialError {
574                    multiaddr: address,
575                    error,
576                }),
577            ConnectedPoint::Listener { local_addr, send_back_addr } =>
578                (None, NetworkEvent::IncomingConnectionError {
579                    local_addr,
580                    send_back_addr,
581                    error
582                })
583        }
584    }
585}
586
587/// Information about the network obtained by [`Network::info()`].
588#[derive(Clone, Debug)]
589pub struct NetworkInfo {
590    /// The total number of connected peers.
591    num_peers: usize,
592    /// Counters of ongoing network connections.
593    connection_counters: ConnectionCounters,
594}
595
596impl NetworkInfo {
597    /// The number of connected peers, i.e. peers with whom at least
598    /// one established connection exists.
599    pub fn num_peers(&self) -> usize {
600        self.num_peers
601    }
602
603    /// Gets counters for ongoing network connections.
604    pub fn connection_counters(&self) -> &ConnectionCounters {
605        &self.connection_counters
606    }
607}
608
609/// The (optional) configuration for a [`Network`].
610///
611/// The default configuration specifies no dedicated task executor, no
612/// connection limits, a connection event buffer size of 32, and a
613/// `notify_handler` buffer size of 8.
614#[derive(Default)]
615pub struct NetworkConfig {
616    /// Note that the `ManagerConfig`s task command buffer always provides
617    /// one "free" slot per task. Thus the given total `notify_handler_buffer_size`
618    /// exposed for configuration on the `Network` is reduced by one.
619    manager_config: ManagerConfig,
620    /// The effective connection limits.
621    limits: ConnectionLimits,
622}
623
624impl NetworkConfig {
625    /// Configures the executor to use for spawning connection background tasks.
626    pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
627        self.manager_config.executor = Some(e);
628        self
629    }
630
631    /// Configures the executor to use for spawning connection background tasks,
632    /// only if no executor has already been configured.
633    pub fn or_else_with_executor<F>(mut self, f: F) -> Self
634    where
635        F: FnOnce() -> Option<Box<dyn Executor + Send>>
636    {
637        self.manager_config.executor = self.manager_config.executor.or_else(f);
638        self
639    }
640
641    /// Sets the maximum number of events sent to a connection's background task
642    /// that may be buffered, if the task cannot keep up with their consumption and
643    /// delivery to the connection handler.
644    ///
645    /// When the buffer for a particular connection is full, `notify_handler` will no
646    /// longer be able to deliver events to the associated `ConnectionHandler`,
647    /// thus exerting back-pressure on the connection and peer API.
648    pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
649        self.manager_config.task_command_buffer_size = n.get() - 1;
650        self
651    }
652
653    /// Sets the maximum number of buffered connection events (beyond a guaranteed
654    /// buffer of 1 event per connection).
655    ///
656    /// When the buffer is full, the background tasks of all connections will stall.
657    /// In this way, the consumers of network events exert back-pressure on
658    /// the network connection I/O.
659    pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
660        self.manager_config.task_event_buffer_size = n;
661        self
662    }
663
664    /// Sets the connection limits to enforce.
665    pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
666        self.limits = limits;
667        self
668    }
669}
670
671#[cfg(test)]
672mod tests {
673    use super::*;
674
675    struct Dummy;
676
677    impl Executor for Dummy {
678        fn exec(&self, _: Pin<Box<dyn Future<Output=()> + Send>>) { }
679    }
680
681    #[test]
682    fn set_executor() {
683        NetworkConfig::default()
684            .with_executor(Box::new(Dummy))
685            .with_executor(Box::new(|f| {
686                async_std::task::spawn(f);
687            }));
688    }
689}