libp2p_core/connection/
manager.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22    Executor,
23    muxing::StreamMuxer,
24};
25use fnv::FnvHashMap;
26use futures::{
27    prelude::*,
28    channel::mpsc,
29    stream::FuturesUnordered
30};
31use std::{
32    collections::hash_map,
33    error,
34    fmt,
35    mem,
36    pin::Pin,
37    task::{Context, Poll},
38};
39use super::{
40    Connected,
41    ConnectedPoint,
42    Connection,
43    ConnectionError,
44    ConnectionHandler,
45    IntoConnectionHandler,
46    PendingConnectionError,
47    Substream
48};
49use task::{Task, TaskId};
50
51mod task;
52
53// Implementation Notes
54// ====================
55//
56// A `Manager` is decoupled from the background tasks through channels.
57// The state of a `Manager` therefore "lags behind" the progress of
58// the tasks -- it is only made aware of progress in the background tasks
59// when it is `poll()`ed.
60//
61// A `Manager` is ignorant of substreams and does not emit any events
62// related to specific substreams.
63//
64// A `Manager` is unaware of any association between connections and peers
65// / peer identities (i.e. the type parameter `C` is completely opaque).
66//
67// There is a 1-1 correspondence between (internal) task IDs and (public)
68// connection IDs, i.e. the task IDs are "re-exported" as connection IDs
69// by the manager. The notion of a (background) task is internal to the
70// manager.
71
72/// The result of a pending connection attempt.
73type ConnectResult<M, TE> = Result<(Connected, M), PendingConnectionError<TE>>;
74
75/// Connection identifier.
76#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
77pub struct ConnectionId(TaskId);
78
79impl ConnectionId {
80    /// Creates a `ConnectionId` from a non-negative integer.
81    ///
82    /// This is primarily useful for creating connection IDs
83    /// in test environments. There is in general no guarantee
84    /// that all connection IDs are based on non-negative integers.
85    pub fn new(id: usize) -> Self {
86        ConnectionId(TaskId(id))
87    }
88}
89
90/// A connection `Manager` orchestrates the I/O of a set of connections.
91pub struct Manager<I, O, H, E, HE> {
92    /// The tasks of the managed connections.
93    ///
94    /// Each managed connection is associated with a (background) task
95    /// spawned onto an executor. Each `TaskInfo` in `tasks` is linked to such a
96    /// background task via a channel. Closing that channel (i.e. dropping
97    /// the sender in the associated `TaskInfo`) stops the background task,
98    /// which will attempt to gracefully close the connection.
99    tasks: FnvHashMap<TaskId, TaskInfo<I>>,
100
101    /// Next available identifier for a new connection / task.
102    next_task_id: TaskId,
103
104    /// Size of the task command buffer (per task).
105    task_command_buffer_size: usize,
106
107    /// The executor to use for running the background tasks. If `None`,
108    /// the tasks are kept in `local_spawns` instead and polled on the
109    /// current thread when the manager is polled for new events.
110    executor: Option<Box<dyn Executor + Send>>,
111
112    /// If no `executor` is configured, tasks are kept in this set and
113    /// polled on the current thread when the manager is polled for new events.
114    local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
115
116    /// Sender distributed to managed tasks for reporting events back
117    /// to the manager.
118    events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
119
120    /// Receiver for events reported from managed tasks.
121    events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
122}
123
124impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
125{
126    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127        f.debug_map()
128            .entries(self.tasks.iter().map(|(id, task)| (id, &task.state)))
129            .finish()
130    }
131}
132
133/// Configuration options when creating a [`Manager`].
134///
135/// The default configuration specifies no dedicated task executor, a
136/// task event buffer size of 32, and a task command buffer size of 7.
137#[non_exhaustive]
138pub struct ManagerConfig {
139    /// Executor to use to spawn tasks.
140    pub executor: Option<Box<dyn Executor + Send>>,
141
142    /// Size of the task command buffer (per task).
143    pub task_command_buffer_size: usize,
144
145    /// Size of the task event buffer (for all tasks).
146    pub task_event_buffer_size: usize,
147}
148
149impl Default for ManagerConfig {
150    fn default() -> Self {
151        ManagerConfig {
152            executor: None,
153            task_event_buffer_size: 32,
154            task_command_buffer_size: 7,
155        }
156    }
157}
158
159/// Internal information about a running task.
160///
161/// Contains the sender to deliver event messages to the task, and
162/// the associated user data.
163#[derive(Debug)]
164struct TaskInfo<I> {
165    /// Channel endpoint to send messages to the task.
166    sender: mpsc::Sender<task::Command<I>>,
167    /// The state of the task as seen by the `Manager`.
168    state: TaskState,
169}
170
171/// Internal state of a running task as seen by the `Manager`.
172#[derive(Debug, Clone, PartialEq, Eq)]
173enum TaskState {
174    /// The connection is being established.
175    Pending,
176    /// The connection is established.
177    Established(Connected),
178}
179
180/// Events produced by the [`Manager`].
181#[derive(Debug)]
182pub enum Event<'a, I, O, H, TE, HE> {
183    /// A connection attempt has failed.
184    PendingConnectionError {
185        /// The connection ID.
186        ///
187        /// As a result of the error, the pending connection has been removed
188        /// from the `Manager` and is being closed. Hence this ID will
189        /// no longer resolve to a valid entry in the manager.
190        id: ConnectionId,
191        /// What happened.
192        error: PendingConnectionError<TE>,
193        /// The handler that was supposed to handle the failed connection.
194        handler: H
195    },
196
197    /// An established connection has been closed.
198    ConnectionClosed {
199        /// The connection ID.
200        ///
201        /// > **Note**: Closed connections are removed from the `Manager`.
202        /// > Hence this ID will no longer resolve to a valid entry in
203        /// > the manager.
204        id: ConnectionId,
205        /// Information about the closed connection.
206        connected: Connected,
207        /// The error that occurred, if any. If `None`, the connection
208        /// has been actively closed.
209        error: Option<ConnectionError<HE>>,
210    },
211
212    /// A connection has been established.
213    ConnectionEstablished {
214        /// The entry associated with the new connection.
215        entry: EstablishedEntry<'a, I>,
216    },
217
218    /// A connection handler has produced an event.
219    ConnectionEvent {
220        /// The entry associated with the connection that produced the event.
221        entry: EstablishedEntry<'a, I>,
222        /// The produced event.
223        event: O
224    },
225
226    /// A connection to a node has changed its address.
227    AddressChange {
228        /// The entry associated with the connection that changed address.
229        entry: EstablishedEntry<'a, I>,
230        /// The former [`ConnectedPoint`].
231        old_endpoint: ConnectedPoint,
232        /// The new [`ConnectedPoint`].
233        new_endpoint: ConnectedPoint,
234    },
235}
236
237impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
238    /// Creates a new connection manager.
239    pub fn new(config: ManagerConfig) -> Self {
240        let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
241        Self {
242            tasks: FnvHashMap::default(),
243            next_task_id: TaskId(0),
244            task_command_buffer_size: config.task_command_buffer_size,
245            executor: config.executor,
246            local_spawns: FuturesUnordered::new(),
247            events_tx: tx,
248            events_rx: rx
249        }
250    }
251
252    /// Adds to the manager a future that tries to reach a node.
253    ///
254    /// This method spawns a task dedicated to resolving this future and
255    /// processing the node's events.
256    pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
257    where
258        I: Send + 'static,
259        O: Send + 'static,
260        TE: error::Error + Send + 'static,
261        HE: error::Error + Send + 'static,
262        M: StreamMuxer + Send + Sync + 'static,
263        M::OutboundSubstream: Send + 'static,
264        F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
265        H: IntoConnectionHandler + Send + 'static,
266        H::Handler: ConnectionHandler<
267            Substream = Substream<M>,
268            InEvent = I,
269            OutEvent = O,
270            Error = HE
271        > + Send + 'static,
272        <H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
273    {
274        let task_id = self.next_task_id;
275        self.next_task_id.0 += 1;
276
277        let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
278        self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Pending });
279
280        let task = Box::pin(Task::pending(task_id, self.events_tx.clone(), rx, future, handler));
281        if let Some(executor) = &mut self.executor {
282            executor.exec(task);
283        } else {
284            self.local_spawns.push(task);
285        }
286
287        ConnectionId(task_id)
288    }
289
290    /// Adds an existing connection to the manager.
291    pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
292    where
293        H: IntoConnectionHandler + Send + 'static,
294        H::Handler: ConnectionHandler<
295            Substream = Substream<M>,
296            InEvent = I,
297            OutEvent = O,
298            Error = HE
299        > + Send + 'static,
300        <H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
301        TE: error::Error + Send + 'static,
302        HE: error::Error + Send + 'static,
303        I: Send + 'static,
304        O: Send + 'static,
305        M: StreamMuxer + Send + Sync + 'static,
306        M::OutboundSubstream: Send + 'static,
307    {
308        let task_id = self.next_task_id;
309        self.next_task_id.0 += 1;
310
311        let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
312        self.tasks.insert(task_id, TaskInfo {
313            sender: tx, state: TaskState::Established(info)
314        });
315
316        let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
317            Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
318
319        if let Some(executor) = &mut self.executor {
320            executor.exec(task);
321        } else {
322            self.local_spawns.push(task);
323        }
324
325        ConnectionId(task_id)
326    }
327
328    /// Gets an entry for a managed connection, if it exists.
329    pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
330        if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
331            Some(Entry::new(task))
332        } else {
333            None
334        }
335    }
336
337    /// Checks whether an established connection with the given ID is currently managed.
338    pub fn is_established(&self, id: &ConnectionId) -> bool {
339        matches!(self.tasks.get(&id.0), Some(TaskInfo { state: TaskState::Established(..), .. }))
340    }
341
342    /// Polls the manager for events relating to the managed connections.
343    pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
344        // Advance the content of `local_spawns`.
345        while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}
346
347        // Poll for the first event for which the manager still has a registered task, if any.
348        let event = loop {
349            match self.events_rx.poll_next_unpin(cx) {
350                Poll::Ready(Some(event)) => {
351                    if self.tasks.contains_key(event.id()) { // (1)
352                        break event
353                    }
354                }
355                Poll::Pending => return Poll::Pending,
356                Poll::Ready(None) => unreachable!("Manager holds both sender and receiver."),
357            }
358        };
359
360        if let hash_map::Entry::Occupied(mut task) = self.tasks.entry(*event.id()) {
361            Poll::Ready(match event {
362                task::Event::Notify { id: _, event } =>
363                    Event::ConnectionEvent {
364                        entry: EstablishedEntry { task },
365                        event
366                    },
367                task::Event::Established { id: _, info } => { // (2)
368                    task.get_mut().state = TaskState::Established(info); // (3)
369                    Event::ConnectionEstablished {
370                        entry: EstablishedEntry { task },
371                    }
372                }
373                task::Event::Failed { id, error, handler } => {
374                    let id = ConnectionId(id);
375                    let _ = task.remove();
376                    Event::PendingConnectionError { id, error, handler }
377                }
378                task::Event::AddressChange { id: _, new_address } => {
379                    let (new, old) = if let TaskState::Established(c) = &mut task.get_mut().state {
380                        let mut new_endpoint = c.endpoint.clone();
381                        new_endpoint.set_remote_address(new_address);
382                        let old_endpoint = mem::replace(&mut c.endpoint, new_endpoint.clone());
383                        (new_endpoint, old_endpoint)
384                    } else {
385                        unreachable!(
386                            "`Event::AddressChange` implies (2) occurred on that task and thus (3)."
387                        )
388                    };
389                    Event::AddressChange {
390                        entry: EstablishedEntry { task },
391                        old_endpoint: old,
392                        new_endpoint: new,
393                    }
394                }
395                task::Event::Closed { id, error } => {
396                    let id = ConnectionId(id);
397                    let task = task.remove();
398                    match task.state {
399                        TaskState::Established(connected) =>
400                            Event::ConnectionClosed { id, connected, error },
401                        TaskState::Pending => unreachable!(
402                            "`Event::Closed` implies (2) occurred on that task and thus (3)."
403                            ),
404                    }
405                }
406            })
407        } else {
408            unreachable!("By (1)")
409        }
410    }
411}
412
413/// An entry for a connection in the manager.
414#[derive(Debug)]
415pub enum Entry<'a, I> {
416    Pending(PendingEntry<'a, I>),
417    Established(EstablishedEntry<'a, I>)
418}
419
420impl<'a, I> Entry<'a, I> {
421    fn new(task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>) -> Self {
422        match &task.get().state {
423            TaskState::Pending => Entry::Pending(PendingEntry { task }),
424            TaskState::Established(_) => Entry::Established(EstablishedEntry { task })
425        }
426    }
427}
428
429/// An entry for a managed connection that is considered established.
430#[derive(Debug)]
431pub struct EstablishedEntry<'a, I> {
432    task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>,
433}
434
435impl<'a, I> EstablishedEntry<'a, I> {
436    /// (Asynchronously) sends an event to the connection handler.
437    ///
438    /// If the handler is not ready to receive the event, either because
439    /// it is busy or the connection is about to close, the given event
440    /// is returned with an `Err`.
441    ///
442    /// If execution of this method is preceded by successful execution of
443    /// `poll_ready_notify_handler` without another intervening execution
444    /// of `notify_handler`, it only fails if the connection is now about
445    /// to close.
446    ///
447    /// > **Note**: As this method does not take a `Context`, the current
448    /// > task _may not be notified_ if sending the event fails due to
449    /// > the connection handler not being ready at this time.
450    pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
451        let cmd = task::Command::NotifyHandler(event); // (*)
452        self.task.get_mut().sender.try_send(cmd)
453            .map_err(|e| match e.into_inner() {
454                task::Command::NotifyHandler(event) => event,
455                _ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
456            })
457    }
458
459    /// Checks if `notify_handler` is ready to accept an event.
460    ///
461    /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
462    ///
463    /// Returns `Err(())` if the background task associated with the connection
464    /// is terminating and the connection is about to close.
465    pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
466        self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
467    }
468
469    /// Sends a close command to the associated background task,
470    /// thus initiating a graceful active close of the connection.
471    ///
472    /// Has no effect if the connection is already closing.
473    ///
474    /// When the connection is ultimately closed, [`Event::ConnectionClosed`]
475    /// is emitted by [`Manager::poll`].
476    pub fn start_close(mut self) {
477        // Clone the sender so that we are guaranteed to have
478        // capacity for the close command (every sender gets a slot).
479        match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
480            Ok(()) => {},
481            Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
482        }
483    }
484
485    /// Obtains information about the established connection.
486    pub fn connected(&self) -> &Connected {
487        match &self.task.get().state {
488            TaskState::Established(c) => c,
489            TaskState::Pending => unreachable!("By Entry::new()")
490        }
491    }
492
493    /// Instantly removes the entry from the manager, dropping
494    /// the command channel to the background task of the connection,
495    /// which will thus drop the connection asap without an orderly
496    /// close or emitting another event.
497    pub fn remove(self) -> Connected {
498        match self.task.remove().state {
499            TaskState::Established(c) => c,
500            TaskState::Pending => unreachable!("By Entry::new()")
501        }
502    }
503
504    /// Returns the connection ID.
505    pub fn id(&self) -> ConnectionId {
506        ConnectionId(*self.task.key())
507    }
508}
509
510/// An entry for a managed connection that is currently being established
511/// (i.e. pending).
512#[derive(Debug)]
513pub struct PendingEntry<'a, I> {
514    task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>
515}
516
517impl<'a, I> PendingEntry<'a, I> {
518    /// Returns the connection id.
519    pub fn id(&self) -> ConnectionId {
520        ConnectionId(*self.task.key())
521    }
522
523    /// Aborts the pending connection attempt.
524    pub fn abort(self)  {
525        self.task.remove();
526    }
527}