libp2p_core/connection/manager.rs
1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::{
22 Executor,
23 muxing::StreamMuxer,
24};
25use fnv::FnvHashMap;
26use futures::{
27 prelude::*,
28 channel::mpsc,
29 stream::FuturesUnordered
30};
31use std::{
32 collections::hash_map,
33 error,
34 fmt,
35 mem,
36 pin::Pin,
37 task::{Context, Poll},
38};
39use super::{
40 Connected,
41 ConnectedPoint,
42 Connection,
43 ConnectionError,
44 ConnectionHandler,
45 IntoConnectionHandler,
46 PendingConnectionError,
47 Substream
48};
49use task::{Task, TaskId};
50
51mod task;
52
53// Implementation Notes
54// ====================
55//
56// A `Manager` is decoupled from the background tasks through channels.
57// The state of a `Manager` therefore "lags behind" the progress of
58// the tasks -- it is only made aware of progress in the background tasks
59// when it is `poll()`ed.
60//
61// A `Manager` is ignorant of substreams and does not emit any events
62// related to specific substreams.
63//
64// A `Manager` is unaware of any association between connections and peers
65// / peer identities (i.e. the type parameter `C` is completely opaque).
66//
67// There is a 1-1 correspondence between (internal) task IDs and (public)
68// connection IDs, i.e. the task IDs are "re-exported" as connection IDs
69// by the manager. The notion of a (background) task is internal to the
70// manager.
71
72/// The result of a pending connection attempt.
73type ConnectResult<M, TE> = Result<(Connected, M), PendingConnectionError<TE>>;
74
75/// Connection identifier.
76#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
77pub struct ConnectionId(TaskId);
78
79impl ConnectionId {
80 /// Creates a `ConnectionId` from a non-negative integer.
81 ///
82 /// This is primarily useful for creating connection IDs
83 /// in test environments. There is in general no guarantee
84 /// that all connection IDs are based on non-negative integers.
85 pub fn new(id: usize) -> Self {
86 ConnectionId(TaskId(id))
87 }
88}
89
90/// A connection `Manager` orchestrates the I/O of a set of connections.
91pub struct Manager<I, O, H, E, HE> {
92 /// The tasks of the managed connections.
93 ///
94 /// Each managed connection is associated with a (background) task
95 /// spawned onto an executor. Each `TaskInfo` in `tasks` is linked to such a
96 /// background task via a channel. Closing that channel (i.e. dropping
97 /// the sender in the associated `TaskInfo`) stops the background task,
98 /// which will attempt to gracefully close the connection.
99 tasks: FnvHashMap<TaskId, TaskInfo<I>>,
100
101 /// Next available identifier for a new connection / task.
102 next_task_id: TaskId,
103
104 /// Size of the task command buffer (per task).
105 task_command_buffer_size: usize,
106
107 /// The executor to use for running the background tasks. If `None`,
108 /// the tasks are kept in `local_spawns` instead and polled on the
109 /// current thread when the manager is polled for new events.
110 executor: Option<Box<dyn Executor + Send>>,
111
112 /// If no `executor` is configured, tasks are kept in this set and
113 /// polled on the current thread when the manager is polled for new events.
114 local_spawns: FuturesUnordered<Pin<Box<dyn Future<Output = ()> + Send>>>,
115
116 /// Sender distributed to managed tasks for reporting events back
117 /// to the manager.
118 events_tx: mpsc::Sender<task::Event<O, H, E, HE>>,
119
120 /// Receiver for events reported from managed tasks.
121 events_rx: mpsc::Receiver<task::Event<O, H, E, HE>>
122}
123
124impl<I, O, H, E, HE> fmt::Debug for Manager<I, O, H, E, HE>
125{
126 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
127 f.debug_map()
128 .entries(self.tasks.iter().map(|(id, task)| (id, &task.state)))
129 .finish()
130 }
131}
132
133/// Configuration options when creating a [`Manager`].
134///
135/// The default configuration specifies no dedicated task executor, a
136/// task event buffer size of 32, and a task command buffer size of 7.
137#[non_exhaustive]
138pub struct ManagerConfig {
139 /// Executor to use to spawn tasks.
140 pub executor: Option<Box<dyn Executor + Send>>,
141
142 /// Size of the task command buffer (per task).
143 pub task_command_buffer_size: usize,
144
145 /// Size of the task event buffer (for all tasks).
146 pub task_event_buffer_size: usize,
147}
148
149impl Default for ManagerConfig {
150 fn default() -> Self {
151 ManagerConfig {
152 executor: None,
153 task_event_buffer_size: 32,
154 task_command_buffer_size: 7,
155 }
156 }
157}
158
159/// Internal information about a running task.
160///
161/// Contains the sender to deliver event messages to the task, and
162/// the associated user data.
163#[derive(Debug)]
164struct TaskInfo<I> {
165 /// Channel endpoint to send messages to the task.
166 sender: mpsc::Sender<task::Command<I>>,
167 /// The state of the task as seen by the `Manager`.
168 state: TaskState,
169}
170
171/// Internal state of a running task as seen by the `Manager`.
172#[derive(Debug, Clone, PartialEq, Eq)]
173enum TaskState {
174 /// The connection is being established.
175 Pending,
176 /// The connection is established.
177 Established(Connected),
178}
179
180/// Events produced by the [`Manager`].
181#[derive(Debug)]
182pub enum Event<'a, I, O, H, TE, HE> {
183 /// A connection attempt has failed.
184 PendingConnectionError {
185 /// The connection ID.
186 ///
187 /// As a result of the error, the pending connection has been removed
188 /// from the `Manager` and is being closed. Hence this ID will
189 /// no longer resolve to a valid entry in the manager.
190 id: ConnectionId,
191 /// What happened.
192 error: PendingConnectionError<TE>,
193 /// The handler that was supposed to handle the failed connection.
194 handler: H
195 },
196
197 /// An established connection has been closed.
198 ConnectionClosed {
199 /// The connection ID.
200 ///
201 /// > **Note**: Closed connections are removed from the `Manager`.
202 /// > Hence this ID will no longer resolve to a valid entry in
203 /// > the manager.
204 id: ConnectionId,
205 /// Information about the closed connection.
206 connected: Connected,
207 /// The error that occurred, if any. If `None`, the connection
208 /// has been actively closed.
209 error: Option<ConnectionError<HE>>,
210 },
211
212 /// A connection has been established.
213 ConnectionEstablished {
214 /// The entry associated with the new connection.
215 entry: EstablishedEntry<'a, I>,
216 },
217
218 /// A connection handler has produced an event.
219 ConnectionEvent {
220 /// The entry associated with the connection that produced the event.
221 entry: EstablishedEntry<'a, I>,
222 /// The produced event.
223 event: O
224 },
225
226 /// A connection to a node has changed its address.
227 AddressChange {
228 /// The entry associated with the connection that changed address.
229 entry: EstablishedEntry<'a, I>,
230 /// The former [`ConnectedPoint`].
231 old_endpoint: ConnectedPoint,
232 /// The new [`ConnectedPoint`].
233 new_endpoint: ConnectedPoint,
234 },
235}
236
237impl<I, O, H, TE, HE> Manager<I, O, H, TE, HE> {
238 /// Creates a new connection manager.
239 pub fn new(config: ManagerConfig) -> Self {
240 let (tx, rx) = mpsc::channel(config.task_event_buffer_size);
241 Self {
242 tasks: FnvHashMap::default(),
243 next_task_id: TaskId(0),
244 task_command_buffer_size: config.task_command_buffer_size,
245 executor: config.executor,
246 local_spawns: FuturesUnordered::new(),
247 events_tx: tx,
248 events_rx: rx
249 }
250 }
251
252 /// Adds to the manager a future that tries to reach a node.
253 ///
254 /// This method spawns a task dedicated to resolving this future and
255 /// processing the node's events.
256 pub fn add_pending<F, M>(&mut self, future: F, handler: H) -> ConnectionId
257 where
258 I: Send + 'static,
259 O: Send + 'static,
260 TE: error::Error + Send + 'static,
261 HE: error::Error + Send + 'static,
262 M: StreamMuxer + Send + Sync + 'static,
263 M::OutboundSubstream: Send + 'static,
264 F: Future<Output = ConnectResult<M, TE>> + Send + 'static,
265 H: IntoConnectionHandler + Send + 'static,
266 H::Handler: ConnectionHandler<
267 Substream = Substream<M>,
268 InEvent = I,
269 OutEvent = O,
270 Error = HE
271 > + Send + 'static,
272 <H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
273 {
274 let task_id = self.next_task_id;
275 self.next_task_id.0 += 1;
276
277 let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
278 self.tasks.insert(task_id, TaskInfo { sender: tx, state: TaskState::Pending });
279
280 let task = Box::pin(Task::pending(task_id, self.events_tx.clone(), rx, future, handler));
281 if let Some(executor) = &mut self.executor {
282 executor.exec(task);
283 } else {
284 self.local_spawns.push(task);
285 }
286
287 ConnectionId(task_id)
288 }
289
290 /// Adds an existing connection to the manager.
291 pub fn add<M>(&mut self, conn: Connection<M, H::Handler>, info: Connected) -> ConnectionId
292 where
293 H: IntoConnectionHandler + Send + 'static,
294 H::Handler: ConnectionHandler<
295 Substream = Substream<M>,
296 InEvent = I,
297 OutEvent = O,
298 Error = HE
299 > + Send + 'static,
300 <H::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
301 TE: error::Error + Send + 'static,
302 HE: error::Error + Send + 'static,
303 I: Send + 'static,
304 O: Send + 'static,
305 M: StreamMuxer + Send + Sync + 'static,
306 M::OutboundSubstream: Send + 'static,
307 {
308 let task_id = self.next_task_id;
309 self.next_task_id.0 += 1;
310
311 let (tx, rx) = mpsc::channel(self.task_command_buffer_size);
312 self.tasks.insert(task_id, TaskInfo {
313 sender: tx, state: TaskState::Established(info)
314 });
315
316 let task: Pin<Box<Task<Pin<Box<future::Pending<_>>>, _, _, _, _, _>>> =
317 Box::pin(Task::established(task_id, self.events_tx.clone(), rx, conn));
318
319 if let Some(executor) = &mut self.executor {
320 executor.exec(task);
321 } else {
322 self.local_spawns.push(task);
323 }
324
325 ConnectionId(task_id)
326 }
327
328 /// Gets an entry for a managed connection, if it exists.
329 pub fn entry(&mut self, id: ConnectionId) -> Option<Entry<'_, I>> {
330 if let hash_map::Entry::Occupied(task) = self.tasks.entry(id.0) {
331 Some(Entry::new(task))
332 } else {
333 None
334 }
335 }
336
337 /// Checks whether an established connection with the given ID is currently managed.
338 pub fn is_established(&self, id: &ConnectionId) -> bool {
339 matches!(self.tasks.get(&id.0), Some(TaskInfo { state: TaskState::Established(..), .. }))
340 }
341
342 /// Polls the manager for events relating to the managed connections.
343 pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<Event<'a, I, O, H, TE, HE>> {
344 // Advance the content of `local_spawns`.
345 while let Poll::Ready(Some(_)) = self.local_spawns.poll_next_unpin(cx) {}
346
347 // Poll for the first event for which the manager still has a registered task, if any.
348 let event = loop {
349 match self.events_rx.poll_next_unpin(cx) {
350 Poll::Ready(Some(event)) => {
351 if self.tasks.contains_key(event.id()) { // (1)
352 break event
353 }
354 }
355 Poll::Pending => return Poll::Pending,
356 Poll::Ready(None) => unreachable!("Manager holds both sender and receiver."),
357 }
358 };
359
360 if let hash_map::Entry::Occupied(mut task) = self.tasks.entry(*event.id()) {
361 Poll::Ready(match event {
362 task::Event::Notify { id: _, event } =>
363 Event::ConnectionEvent {
364 entry: EstablishedEntry { task },
365 event
366 },
367 task::Event::Established { id: _, info } => { // (2)
368 task.get_mut().state = TaskState::Established(info); // (3)
369 Event::ConnectionEstablished {
370 entry: EstablishedEntry { task },
371 }
372 }
373 task::Event::Failed { id, error, handler } => {
374 let id = ConnectionId(id);
375 let _ = task.remove();
376 Event::PendingConnectionError { id, error, handler }
377 }
378 task::Event::AddressChange { id: _, new_address } => {
379 let (new, old) = if let TaskState::Established(c) = &mut task.get_mut().state {
380 let mut new_endpoint = c.endpoint.clone();
381 new_endpoint.set_remote_address(new_address);
382 let old_endpoint = mem::replace(&mut c.endpoint, new_endpoint.clone());
383 (new_endpoint, old_endpoint)
384 } else {
385 unreachable!(
386 "`Event::AddressChange` implies (2) occurred on that task and thus (3)."
387 )
388 };
389 Event::AddressChange {
390 entry: EstablishedEntry { task },
391 old_endpoint: old,
392 new_endpoint: new,
393 }
394 }
395 task::Event::Closed { id, error } => {
396 let id = ConnectionId(id);
397 let task = task.remove();
398 match task.state {
399 TaskState::Established(connected) =>
400 Event::ConnectionClosed { id, connected, error },
401 TaskState::Pending => unreachable!(
402 "`Event::Closed` implies (2) occurred on that task and thus (3)."
403 ),
404 }
405 }
406 })
407 } else {
408 unreachable!("By (1)")
409 }
410 }
411}
412
413/// An entry for a connection in the manager.
414#[derive(Debug)]
415pub enum Entry<'a, I> {
416 Pending(PendingEntry<'a, I>),
417 Established(EstablishedEntry<'a, I>)
418}
419
420impl<'a, I> Entry<'a, I> {
421 fn new(task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>) -> Self {
422 match &task.get().state {
423 TaskState::Pending => Entry::Pending(PendingEntry { task }),
424 TaskState::Established(_) => Entry::Established(EstablishedEntry { task })
425 }
426 }
427}
428
429/// An entry for a managed connection that is considered established.
430#[derive(Debug)]
431pub struct EstablishedEntry<'a, I> {
432 task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>,
433}
434
435impl<'a, I> EstablishedEntry<'a, I> {
436 /// (Asynchronously) sends an event to the connection handler.
437 ///
438 /// If the handler is not ready to receive the event, either because
439 /// it is busy or the connection is about to close, the given event
440 /// is returned with an `Err`.
441 ///
442 /// If execution of this method is preceded by successful execution of
443 /// `poll_ready_notify_handler` without another intervening execution
444 /// of `notify_handler`, it only fails if the connection is now about
445 /// to close.
446 ///
447 /// > **Note**: As this method does not take a `Context`, the current
448 /// > task _may not be notified_ if sending the event fails due to
449 /// > the connection handler not being ready at this time.
450 pub fn notify_handler(&mut self, event: I) -> Result<(), I> {
451 let cmd = task::Command::NotifyHandler(event); // (*)
452 self.task.get_mut().sender.try_send(cmd)
453 .map_err(|e| match e.into_inner() {
454 task::Command::NotifyHandler(event) => event,
455 _ => panic!("Unexpected command. Expected `NotifyHandler`") // see (*)
456 })
457 }
458
459 /// Checks if `notify_handler` is ready to accept an event.
460 ///
461 /// Returns `Ok(())` if the handler is ready to receive an event via `notify_handler`.
462 ///
463 /// Returns `Err(())` if the background task associated with the connection
464 /// is terminating and the connection is about to close.
465 pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
466 self.task.get_mut().sender.poll_ready(cx).map_err(|_| ())
467 }
468
469 /// Sends a close command to the associated background task,
470 /// thus initiating a graceful active close of the connection.
471 ///
472 /// Has no effect if the connection is already closing.
473 ///
474 /// When the connection is ultimately closed, [`Event::ConnectionClosed`]
475 /// is emitted by [`Manager::poll`].
476 pub fn start_close(mut self) {
477 // Clone the sender so that we are guaranteed to have
478 // capacity for the close command (every sender gets a slot).
479 match self.task.get_mut().sender.clone().try_send(task::Command::Close) {
480 Ok(()) => {},
481 Err(e) => assert!(e.is_disconnected(), "No capacity for close command.")
482 }
483 }
484
485 /// Obtains information about the established connection.
486 pub fn connected(&self) -> &Connected {
487 match &self.task.get().state {
488 TaskState::Established(c) => c,
489 TaskState::Pending => unreachable!("By Entry::new()")
490 }
491 }
492
493 /// Instantly removes the entry from the manager, dropping
494 /// the command channel to the background task of the connection,
495 /// which will thus drop the connection asap without an orderly
496 /// close or emitting another event.
497 pub fn remove(self) -> Connected {
498 match self.task.remove().state {
499 TaskState::Established(c) => c,
500 TaskState::Pending => unreachable!("By Entry::new()")
501 }
502 }
503
504 /// Returns the connection ID.
505 pub fn id(&self) -> ConnectionId {
506 ConnectionId(*self.task.key())
507 }
508}
509
510/// An entry for a managed connection that is currently being established
511/// (i.e. pending).
512#[derive(Debug)]
513pub struct PendingEntry<'a, I> {
514 task: hash_map::OccupiedEntry<'a, TaskId, TaskInfo<I>>
515}
516
517impl<'a, I> PendingEntry<'a, I> {
518 /// Returns the connection id.
519 pub fn id(&self) -> ConnectionId {
520 ConnectionId(*self.task.key())
521 }
522
523 /// Aborts the pending connection attempt.
524 pub fn abort(self) {
525 self.task.remove();
526 }
527}