1mod event;
22pub mod peer;
23
24pub use crate::connection::{ConnectionLimits, ConnectionCounters};
25pub use event::{NetworkEvent, IncomingConnection};
26pub use peer::Peer;
27
28use crate::{
29 ConnectedPoint,
30 Executor,
31 Multiaddr,
32 PeerId,
33 connection::{
34 ConnectionId,
35 ConnectionLimit,
36 ConnectionHandler,
37 IntoConnectionHandler,
38 IncomingInfo,
39 OutgoingInfo,
40 ListenersEvent,
41 ListenerId,
42 ListenersStream,
43 PendingConnectionError,
44 Substream,
45 manager::ManagerConfig,
46 pool::{Pool, PoolEvent},
47 },
48 muxing::StreamMuxer,
49 transport::{Transport, TransportError},
50};
51use fnv::{FnvHashMap};
52use futures::{prelude::*, future};
53use smallvec::SmallVec;
54use std::{
55 collections::hash_map,
56 convert::TryFrom as _,
57 error,
58 fmt,
59 num::NonZeroUsize,
60 pin::Pin,
61 task::{Context, Poll},
62};
63
64pub struct Network<TTrans, TInEvent, TOutEvent, THandler>
66where
67 TTrans: Transport,
68 THandler: IntoConnectionHandler,
69{
70 local_peer_id: PeerId,
72
73 listeners: ListenersStream<TTrans>,
75
76 pool: Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
78 <THandler::Handler as ConnectionHandler>::Error>,
79
80 dialing: FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
93}
94
95impl<TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
96 Network<TTrans, TInEvent, TOutEvent, THandler>
97where
98 TTrans: fmt::Debug + Transport,
99 THandler: fmt::Debug + ConnectionHandler,
100{
101 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
102 f.debug_struct("ReachAttempts")
103 .field("local_peer_id", &self.local_peer_id)
104 .field("listeners", &self.listeners)
105 .field("peers", &self.pool)
106 .field("dialing", &self.dialing)
107 .finish()
108 }
109}
110
111impl<TTrans, TInEvent, TOutEvent, THandler> Unpin for
112 Network<TTrans, TInEvent, TOutEvent, THandler>
113where
114 TTrans: Transport,
115 THandler: IntoConnectionHandler,
116{
117}
118
119impl<TTrans, TInEvent, TOutEvent, THandler>
120 Network<TTrans, TInEvent, TOutEvent, THandler>
121where
122 TTrans: Transport,
123 THandler: IntoConnectionHandler,
124{
125 fn disconnect(&mut self, peer: &PeerId) {
126 self.pool.disconnect(peer);
127 self.dialing.remove(peer);
128 }
129}
130
131impl<TTrans, TInEvent, TOutEvent, TMuxer, THandler>
132 Network<TTrans, TInEvent, TOutEvent, THandler>
133where
134 TTrans: Transport + Clone,
135 TMuxer: StreamMuxer,
136 THandler: IntoConnectionHandler + Send + 'static,
137 THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
138 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
139 <THandler::Handler as ConnectionHandler>::Error: error::Error + Send,
140{
141 pub fn new(
143 transport: TTrans,
144 local_peer_id: PeerId,
145 config: NetworkConfig,
146 ) -> Self {
147 Network {
148 local_peer_id,
149 listeners: ListenersStream::new(transport),
150 pool: Pool::new(local_peer_id, config.manager_config, config.limits),
151 dialing: Default::default(),
152 }
153 }
154
155 pub fn transport(&self) -> &TTrans {
157 self.listeners.transport()
158 }
159
160 pub fn listen_on(&mut self, addr: Multiaddr) -> Result<ListenerId, TransportError<TTrans::Error>> {
162 self.listeners.listen_on(addr)
163 }
164
165 pub fn remove_listener(&mut self, id: ListenerId) -> Result<(), ()> {
169 self.listeners.remove_listener(id)
170 }
171
172 pub fn listen_addrs(&self) -> impl Iterator<Item = &Multiaddr> {
174 self.listeners.listen_addrs()
175 }
176
177 pub fn address_translation<'a>(&'a self, observed_addr: &'a Multiaddr)
190 -> impl Iterator<Item = Multiaddr> + 'a
191 where
192 TMuxer: 'a,
193 THandler: 'a,
194 {
195 let transport = self.listeners.transport();
196 let mut addrs: Vec<_> = self.listen_addrs()
197 .filter_map(move |server| transport.address_translation(server, observed_addr))
198 .collect();
199
200 addrs.sort_unstable();
202 addrs.dedup();
203
204 addrs.into_iter()
205 }
206
207 pub fn local_peer_id(&self) -> &PeerId {
209 &self.local_peer_id
210 }
211
212 pub fn dial(&mut self, address: &Multiaddr, handler: THandler)
218 -> Result<ConnectionId, ConnectionLimit>
219 where
220 TTrans: Transport<Output = (PeerId, TMuxer)>,
221 TTrans::Error: Send + 'static,
222 TTrans::Dial: Send + 'static,
223 TMuxer: Send + Sync + 'static,
224 TMuxer::OutboundSubstream: Send,
225 TInEvent: Send + 'static,
226 TOutEvent: Send + 'static,
227 {
228 let info = OutgoingInfo { address, peer_id: None };
229 match self.transport().clone().dial(address.clone()) {
230 Ok(f) => {
231 let f = f.map_err(|err| PendingConnectionError::Transport(TransportError::Other(err)));
232 self.pool.add_outgoing(f, handler, info)
233 }
234 Err(err) => {
235 let f = future::err(PendingConnectionError::Transport(err));
236 self.pool.add_outgoing(f, handler, info)
237 }
238 }
239 }
240
241 pub fn info(&self) -> NetworkInfo {
243 let num_peers = self.pool.num_peers();
244 let connection_counters = self.pool.counters().clone();
245 NetworkInfo {
246 num_peers,
247 connection_counters,
248 }
249 }
250
251 pub fn incoming_info(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
253 self.pool.iter_pending_incoming()
254 }
255
256 pub fn unknown_dials(&self) -> impl Iterator<Item = &Multiaddr> {
258 self.pool.iter_pending_outgoing()
259 .filter_map(|info| {
260 if info.peer_id.is_none() {
261 Some(info.address)
262 } else {
263 None
264 }
265 })
266 }
267
268 pub fn connected_peers(&self) -> impl Iterator<Item = &PeerId> {
271 self.pool.iter_connected()
272 }
273
274 pub fn is_connected(&self, peer: &PeerId) -> bool {
276 self.pool.is_connected(peer)
277 }
278
279 pub fn is_dialing(&self, peer: &PeerId) -> bool {
281 self.dialing.contains_key(peer)
282 }
283
284 pub fn is_disconnected(&self, peer: &PeerId) -> bool {
287 !self.is_connected(peer) && !self.is_dialing(peer)
288 }
289
290 pub fn dialing_peers(&self) -> impl Iterator<Item = &PeerId> {
293 self.dialing.keys()
294 }
295
296 pub fn peer(&mut self, peer_id: PeerId)
298 -> Peer<'_, TTrans, TInEvent, TOutEvent, THandler>
299 {
300 Peer::new(self, peer_id)
301 }
302
303 pub fn accept(
309 &mut self,
310 connection: IncomingConnection<TTrans::ListenerUpgrade>,
311 handler: THandler,
312 ) -> Result<ConnectionId, ConnectionLimit>
313 where
314 TInEvent: Send + 'static,
315 TOutEvent: Send + 'static,
316 TMuxer: StreamMuxer + Send + Sync + 'static,
317 TMuxer::OutboundSubstream: Send,
318 TTrans: Transport<Output = (PeerId, TMuxer)>,
319 TTrans::Error: Send + 'static,
320 TTrans::ListenerUpgrade: Send + 'static,
321 {
322 let upgrade = connection.upgrade.map_err(|err|
323 PendingConnectionError::Transport(TransportError::Other(err)));
324 let info = IncomingInfo {
325 local_addr: &connection.local_addr,
326 send_back_addr: &connection.send_back_addr,
327 };
328 self.pool.add_incoming(upgrade, handler, info)
329 }
330
331 pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>>
333 where
334 TTrans: Transport<Output = (PeerId, TMuxer)>,
335 TTrans::Error: Send + 'static,
336 TTrans::Dial: Send + 'static,
337 TTrans::ListenerUpgrade: Send + 'static,
338 TMuxer: Send + Sync + 'static,
339 TMuxer::OutboundSubstream: Send,
340 TInEvent: Send + 'static,
341 TOutEvent: Send + 'static,
342 THandler: IntoConnectionHandler + Send + 'static,
343 THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send + 'static,
344 <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
345 {
346 match ListenersStream::poll(Pin::new(&mut self.listeners), cx) {
348 Poll::Pending => (),
349 Poll::Ready(ListenersEvent::Incoming {
350 listener_id,
351 upgrade,
352 local_addr,
353 send_back_addr
354 }) => {
355 return Poll::Ready(NetworkEvent::IncomingConnection {
356 listener_id,
357 connection: IncomingConnection {
358 upgrade,
359 local_addr,
360 send_back_addr,
361 }
362 })
363 }
364 Poll::Ready(ListenersEvent::NewAddress { listener_id, listen_addr }) => {
365 return Poll::Ready(NetworkEvent::NewListenerAddress { listener_id, listen_addr })
366 }
367 Poll::Ready(ListenersEvent::AddressExpired { listener_id, listen_addr }) => {
368 return Poll::Ready(NetworkEvent::ExpiredListenerAddress { listener_id, listen_addr })
369 }
370 Poll::Ready(ListenersEvent::Closed { listener_id, addresses, reason }) => {
371 return Poll::Ready(NetworkEvent::ListenerClosed { listener_id, addresses, reason })
372 }
373 Poll::Ready(ListenersEvent::Error { listener_id, error }) => {
374 return Poll::Ready(NetworkEvent::ListenerError { listener_id, error })
375 }
376 }
377
378 let event = match self.pool.poll(cx) {
380 Poll::Pending => return Poll::Pending,
381 Poll::Ready(PoolEvent::ConnectionEstablished { connection, num_established }) => {
382 if let hash_map::Entry::Occupied(mut e) = self.dialing.entry(connection.peer_id()) {
383 e.get_mut().retain(|s| s.current.0 != connection.id());
384 if e.get().is_empty() {
385 e.remove();
386 }
387 }
388
389 NetworkEvent::ConnectionEstablished {
390 connection,
391 num_established,
392 }
393 }
394 Poll::Ready(PoolEvent::PendingConnectionError { id, endpoint, error, handler, pool, .. }) => {
395 let dialing = &mut self.dialing;
396 let (next, event) = on_connection_failed(dialing, id, endpoint, error, handler);
397 if let Some(dial) = next {
398 let transport = self.listeners.transport().clone();
399 if let Err(e) = dial_peer_impl(transport, pool, dialing, dial) {
400 log::warn!("Dialing aborted: {:?}", e);
401 }
402 }
403 event
404 }
405 Poll::Ready(PoolEvent::ConnectionClosed { id, connected, error, num_established, .. }) => {
406 NetworkEvent::ConnectionClosed {
407 id,
408 connected,
409 num_established,
410 error,
411 }
412 }
413 Poll::Ready(PoolEvent::ConnectionEvent { connection, event }) => {
414 NetworkEvent::ConnectionEvent {
415 connection,
416 event,
417 }
418 }
419 Poll::Ready(PoolEvent::AddressChange { connection, new_endpoint, old_endpoint }) => {
420 NetworkEvent::AddressChange {
421 connection,
422 new_endpoint,
423 old_endpoint,
424 }
425 }
426 };
427
428 Poll::Ready(event)
429 }
430
431 fn dial_peer(&mut self, opts: DialingOpts<PeerId, THandler>)
433 -> Result<ConnectionId, ConnectionLimit>
434 where
435 TTrans: Transport<Output = (PeerId, TMuxer)>,
436 TTrans::Dial: Send + 'static,
437 TTrans::Error: Send + 'static,
438 TMuxer: Send + Sync + 'static,
439 TMuxer::OutboundSubstream: Send,
440 TInEvent: Send + 'static,
441 TOutEvent: Send + 'static,
442 {
443 dial_peer_impl(self.transport().clone(), &mut self.pool, &mut self.dialing, opts)
444 }
445}
446
447struct DialingOpts<PeerId, THandler> {
450 peer: PeerId,
451 handler: THandler,
452 address: Multiaddr,
453 remaining: Vec<Multiaddr>,
454}
455
456fn dial_peer_impl<TMuxer, TInEvent, TOutEvent, THandler, TTrans>(
458 transport: TTrans,
459 pool: &mut Pool<TInEvent, TOutEvent, THandler, TTrans::Error,
460 <THandler::Handler as ConnectionHandler>::Error>,
461 dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
462 opts: DialingOpts<PeerId, THandler>
463) -> Result<ConnectionId, ConnectionLimit>
464where
465 THandler: IntoConnectionHandler + Send + 'static,
466 <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
467 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
468 THandler::Handler: ConnectionHandler<
469 Substream = Substream<TMuxer>,
470 InEvent = TInEvent,
471 OutEvent = TOutEvent,
472 > + Send + 'static,
473 TTrans: Transport<Output = (PeerId, TMuxer)>,
474 TTrans::Dial: Send + 'static,
475 TTrans::Error: error::Error + Send + 'static,
476 TMuxer: StreamMuxer + Send + Sync + 'static,
477 TMuxer::OutboundSubstream: Send + 'static,
478 TInEvent: Send + 'static,
479 TOutEvent: Send + 'static,
480{
481 let result = match transport.dial(opts.address.clone()) {
482 Ok(fut) => {
483 let fut = fut.map_err(|e| PendingConnectionError::Transport(TransportError::Other(e)));
484 let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
485 pool.add_outgoing(fut, opts.handler, info)
486 },
487 Err(err) => {
488 let fut = future::err(PendingConnectionError::Transport(err));
489 let info = OutgoingInfo { address: &opts.address, peer_id: Some(&opts.peer) };
490 pool.add_outgoing(fut, opts.handler, info)
491 },
492 };
493
494 if let Ok(id) = &result {
495 dialing.entry(opts.peer).or_default().push(
496 peer::DialingState {
497 current: (*id, opts.address),
498 remaining: opts.remaining,
499 },
500 );
501 }
502
503 result
504}
505
506fn on_connection_failed<'a, TTrans, TInEvent, TOutEvent, THandler>(
512 dialing: &mut FnvHashMap<PeerId, SmallVec<[peer::DialingState; 10]>>,
513 id: ConnectionId,
514 endpoint: ConnectedPoint,
515 error: PendingConnectionError<TTrans::Error>,
516 handler: Option<THandler>,
517) -> (Option<DialingOpts<PeerId, THandler>>, NetworkEvent<'a, TTrans, TInEvent, TOutEvent, THandler>)
518where
519 TTrans: Transport,
520 THandler: IntoConnectionHandler,
521{
522 let dialing_failed = dialing.iter_mut()
524 .find_map(|(peer, attempts)| {
525 if let Some(pos) = attempts.iter().position(|s| s.current.0 == id) {
526 let attempt = attempts.remove(pos);
527 let last = attempts.is_empty();
528 Some((*peer, attempt, last))
529 } else {
530 None
531 }
532 });
533
534 if let Some((peer_id, mut attempt, last)) = dialing_failed {
535 if last {
536 dialing.remove(&peer_id);
537 }
538
539 let num_remain = u32::try_from(attempt.remaining.len()).unwrap();
540 let failed_addr = attempt.current.1.clone();
541
542 let (opts, attempts_remaining) =
543 if num_remain > 0 {
544 if let Some(handler) = handler {
545 let next_attempt = attempt.remaining.remove(0);
546 let opts = DialingOpts {
547 peer: peer_id,
548 handler,
549 address: next_attempt,
550 remaining: attempt.remaining
551 };
552 (Some(opts), num_remain)
553 } else {
554 (None, 0)
558 }
559 } else {
560 (None, 0)
561 };
562
563 (opts, NetworkEvent::DialError {
564 attempts_remaining,
565 peer_id,
566 multiaddr: failed_addr,
567 error,
568 })
569 } else {
570 match endpoint {
572 ConnectedPoint::Dialer { address } =>
573 (None, NetworkEvent::UnknownPeerDialError {
574 multiaddr: address,
575 error,
576 }),
577 ConnectedPoint::Listener { local_addr, send_back_addr } =>
578 (None, NetworkEvent::IncomingConnectionError {
579 local_addr,
580 send_back_addr,
581 error
582 })
583 }
584 }
585}
586
587#[derive(Clone, Debug)]
589pub struct NetworkInfo {
590 num_peers: usize,
592 connection_counters: ConnectionCounters,
594}
595
596impl NetworkInfo {
597 pub fn num_peers(&self) -> usize {
600 self.num_peers
601 }
602
603 pub fn connection_counters(&self) -> &ConnectionCounters {
605 &self.connection_counters
606 }
607}
608
609#[derive(Default)]
615pub struct NetworkConfig {
616 manager_config: ManagerConfig,
620 limits: ConnectionLimits,
622}
623
624impl NetworkConfig {
625 pub fn with_executor(mut self, e: Box<dyn Executor + Send>) -> Self {
627 self.manager_config.executor = Some(e);
628 self
629 }
630
631 pub fn or_else_with_executor<F>(mut self, f: F) -> Self
634 where
635 F: FnOnce() -> Option<Box<dyn Executor + Send>>
636 {
637 self.manager_config.executor = self.manager_config.executor.or_else(f);
638 self
639 }
640
641 pub fn with_notify_handler_buffer_size(mut self, n: NonZeroUsize) -> Self {
649 self.manager_config.task_command_buffer_size = n.get() - 1;
650 self
651 }
652
653 pub fn with_connection_event_buffer_size(mut self, n: usize) -> Self {
660 self.manager_config.task_event_buffer_size = n;
661 self
662 }
663
664 pub fn with_connection_limits(mut self, limits: ConnectionLimits) -> Self {
666 self.limits = limits;
667 self
668 }
669}
670
671#[cfg(test)]
672mod tests {
673 use super::*;
674
675 struct Dummy;
676
677 impl Executor for Dummy {
678 fn exec(&self, _: Pin<Box<dyn Future<Output=()> + Send>>) { }
679 }
680
681 #[test]
682 fn set_executor() {
683 NetworkConfig::default()
684 .with_executor(Box::new(Dummy))
685 .with_executor(Box::new(|f| {
686 async_std::task::spawn(f);
687 }));
688 }
689}