1use crate::{
22 Multiaddr,
23 Transport,
24 StreamMuxer,
25 connection::{
26 Connected,
27 ConnectedPoint,
28 ConnectionHandler,
29 Connection,
30 ConnectionId,
31 ConnectionLimit,
32 EstablishedConnection,
33 EstablishedConnectionIter,
34 IntoConnectionHandler,
35 PendingConnection,
36 Substream,
37 pool::Pool,
38 },
39 PeerId
40};
41use fnv::FnvHashMap;
42use smallvec::SmallVec;
43use std::{
44 collections::hash_map,
45 error,
46 fmt,
47};
48use super::{Network, DialingOpts};
49
50pub enum Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
57where
58 TTrans: Transport,
59 THandler: IntoConnectionHandler
60{
61 Connected(ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
63
64 Dialing(DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
68
69 Disconnected(DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
73
74 Local,
76}
77
78impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
79 Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
80where
81 TTrans: Transport,
82 THandler: IntoConnectionHandler,
83{
84 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
85 match self {
86 Peer::Connected(p) => {
87 f.debug_struct("Connected")
88 .field("peer", &p)
89 .finish()
90 }
91 Peer::Dialing(p) => {
92 f.debug_struct("Dialing")
93 .field("peer", &p)
94 .finish()
95 }
96 Peer::Disconnected(p) => {
97 f.debug_struct("Disconnected")
98 .field("peer", &p)
99 .finish()
100 }
101 Peer::Local => {
102 f.debug_struct("Local")
103 .finish()
104 }
105 }
106 }
107}
108
109impl<'a, TTrans, TInEvent, TOutEvent, THandler>
110 Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
111where
112 TTrans: Transport,
113 THandler: IntoConnectionHandler,
114{
115 pub(super) fn new(
116 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
117 peer_id: PeerId
118 ) -> Self {
119 if peer_id == network.local_peer_id {
120 return Peer::Local;
121 }
122
123 if network.pool.is_connected(&peer_id) {
124 return Self::connected(network, peer_id)
125 }
126
127 if network.dialing.get_mut(&peer_id).is_some() {
128 return Self::dialing(network, peer_id);
129 }
130
131 Self::disconnected(network, peer_id)
132 }
133
134
135 fn disconnected(
136 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
137 peer_id: PeerId
138 ) -> Self {
139 Peer::Disconnected(DisconnectedPeer { network, peer_id })
140 }
141
142 fn connected(
143 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
144 peer_id: PeerId
145 ) -> Self {
146 Peer::Connected(ConnectedPeer { network, peer_id })
147 }
148
149 fn dialing(
150 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
151 peer_id: PeerId
152 ) -> Self {
153 Peer::Dialing(DialingPeer { network, peer_id })
154 }
155}
156
157impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler>
158 Peer<'a, TTrans, TInEvent, TOutEvent, THandler>
159where
160 TTrans: Transport<Output = (PeerId, TMuxer)> + Clone,
161 TTrans::Error: Send + 'static,
162 TTrans::Dial: Send + 'static,
163 TMuxer: StreamMuxer + Send + Sync + 'static,
164 TMuxer::OutboundSubstream: Send,
165 TInEvent: Send + 'static,
166 TOutEvent: Send + 'static,
167 THandler: IntoConnectionHandler + Send + 'static,
168 THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
169 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
170 <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
171{
172 pub fn is_connected(&self) -> bool {
176 match self {
177 Peer::Connected(..) => true,
178 Peer::Dialing(peer) => peer.is_connected(),
179 Peer::Disconnected(..) => false,
180 Peer::Local => false
181 }
182 }
183
184 pub fn is_dialing(&self) -> bool {
188 match self {
189 Peer::Dialing(_) => true,
190 Peer::Connected(peer) => peer.is_dialing(),
191 Peer::Disconnected(..) => false,
192 Peer::Local => false
193 }
194 }
195
196 pub fn is_disconnected(&self) -> bool {
200 matches!(self, Peer::Disconnected(..))
201 }
202
203 pub fn dial<I>(self, address: Multiaddr, remaining: I, handler: THandler)
211 -> Result<
212 (ConnectionId, DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>),
213 ConnectionLimit
214 >
215 where
216 I: IntoIterator<Item = Multiaddr>,
217 {
218 let (peer_id, network) = match self {
219 Peer::Connected(p) => (p.peer_id, p.network),
220 Peer::Dialing(p) => (p.peer_id, p.network),
221 Peer::Disconnected(p) => (p.peer_id, p.network),
222 Peer::Local => return Err(ConnectionLimit { current: 0, limit: 0 })
223 };
224
225 let id = network.dial_peer(DialingOpts {
226 peer: peer_id,
227 handler,
228 address,
229 remaining: remaining.into_iter().collect(),
230 })?;
231
232 Ok((id, DialingPeer { network, peer_id }))
233 }
234
235 pub fn into_connected(self) -> Option<
239 ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
240 > {
241 match self {
242 Peer::Connected(peer) => Some(peer),
243 Peer::Dialing(peer) => peer.into_connected(),
244 Peer::Disconnected(..) => None,
245 Peer::Local => None,
246 }
247 }
248
249 pub fn into_dialing(self) -> Option<
253 DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
254 > {
255 match self {
256 Peer::Dialing(peer) => Some(peer),
257 Peer::Connected(peer) => peer.into_dialing(),
258 Peer::Disconnected(..) => None,
259 Peer::Local => None
260 }
261 }
262
263 pub fn into_disconnected(self) -> Option<
266 DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
267 > {
268 match self {
269 Peer::Disconnected(peer) => Some(peer),
270 _ => None,
271 }
272 }
273}
274
275pub struct ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
279where
280 TTrans: Transport,
281 THandler: IntoConnectionHandler,
282{
283 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
284 peer_id: PeerId,
285}
286
287impl<'a, TTrans, TInEvent, TOutEvent, THandler>
288 ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
289where
290 TTrans: Transport,
291 THandler: IntoConnectionHandler,
292{
293 pub fn id(&self) -> &PeerId {
294 &self.peer_id
295 }
296
297 pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
299 Peer::Connected(self)
300 }
301
302 pub fn connection(&mut self, id: ConnectionId)
304 -> Option<EstablishedConnection<TInEvent>>
305 {
306 self.network.pool.get_established(id)
307 }
308
309 pub fn num_connections(&self) -> u32 {
311 self.network.pool.num_peer_established(&self.peer_id)
312 }
313
314 pub fn is_dialing(&self) -> bool {
318 self.network.dialing.contains_key(&self.peer_id)
319 }
320
321 pub fn into_dialing(self) -> Option<
324 DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
325 > {
326 if self.network.dialing.contains_key(&self.peer_id) {
327 Some(DialingPeer { network: self.network, peer_id: self.peer_id })
328 } else {
329 None
330 }
331 }
332
333 pub fn connections(&mut self) ->
335 EstablishedConnectionIter<
336 impl Iterator<Item = ConnectionId>,
337 TInEvent,
338 TOutEvent,
339 THandler,
340 TTrans::Error,
341 <THandler::Handler as ConnectionHandler>::Error>
342 {
343 self.network.pool.iter_peer_established(&self.peer_id)
344 }
345
346 pub fn some_connection(&mut self)
348 -> EstablishedConnection<TInEvent>
349 {
350 self.connections()
351 .into_first()
352 .expect("By `Peer::new` and the definition of `ConnectedPeer`.")
353 }
354
355 pub fn disconnect(self)
357 -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
358 {
359 self.network.disconnect(&self.peer_id);
360 DisconnectedPeer { network: self.network, peer_id: self.peer_id }
361 }
362}
363
364impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
365 ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
366where
367 TTrans: Transport,
368 THandler: IntoConnectionHandler,
369{
370 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
371 f.debug_struct("ConnectedPeer")
372 .field("peer_id", &self.peer_id)
373 .field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
374 .field("attempts", &self.network.dialing.get(&self.peer_id))
375 .finish()
376 }
377}
378
379pub struct DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
383where
384 TTrans: Transport,
385 THandler: IntoConnectionHandler,
386{
387 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
388 peer_id: PeerId,
389}
390
391impl<'a, TTrans, TInEvent, TOutEvent, THandler>
392 DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
393where
394 TTrans: Transport,
395 THandler: IntoConnectionHandler,
396{
397 pub fn id(&self) -> &PeerId {
398 &self.peer_id
399 }
400
401 pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
403 Peer::Dialing(self)
404 }
405
406 pub fn disconnect(self)
409 -> DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
410 {
411 self.network.disconnect(&self.peer_id);
412 DisconnectedPeer { network: self.network, peer_id: self.peer_id }
413 }
414
415 pub fn is_connected(&self) -> bool {
419 self.network.pool.is_connected(&self.peer_id)
420 }
421
422 pub fn into_connected(self)
424 -> Option<ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>>
425 {
426 if self.is_connected() {
427 Some(ConnectedPeer { peer_id: self.peer_id, network: self.network })
428 } else {
429 None
430 }
431 }
432
433 pub fn attempt(&mut self, id: ConnectionId)
436 -> Option<DialingAttempt<'_, TInEvent>>
437 {
438 if let hash_map::Entry::Occupied(attempts) = self.network.dialing.entry(self.peer_id) {
439 if let Some(pos) = attempts.get().iter().position(|s| s.current.0 == id) {
440 if let Some(inner) = self.network.pool.get_outgoing(id) {
441 return Some(DialingAttempt { pos, inner, attempts })
442 }
443 }
444 }
445 None
446 }
447
448 pub fn attempts(&mut self)
450 -> DialingAttemptIter<'_,
451 TInEvent,
452 TOutEvent,
453 THandler,
454 TTrans::Error,
455 <THandler::Handler as ConnectionHandler>::Error>
456 {
457 DialingAttemptIter::new(&self.peer_id, &mut self.network.pool, &mut self.network.dialing)
458 }
459
460 pub fn some_attempt(&mut self)
464 -> DialingAttempt<'_, TInEvent>
465 {
466 self.attempts()
467 .into_first()
468 .expect("By `Peer::new` and the definition of `DialingPeer`.")
469 }
470}
471
472impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
473 DialingPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
474where
475 TTrans: Transport,
476 THandler: IntoConnectionHandler,
477{
478 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
479 f.debug_struct("DialingPeer")
480 .field("peer_id", &self.peer_id)
481 .field("established", &self.network.pool.iter_peer_established_info(&self.peer_id))
482 .field("attempts", &self.network.dialing.get(&self.peer_id))
483 .finish()
484 }
485}
486
487pub struct DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
491where
492 TTrans: Transport,
493 THandler: IntoConnectionHandler,
494{
495 peer_id: PeerId,
496 network: &'a mut Network<TTrans, TInEvent, TOutEvent, THandler>,
497}
498
499impl<'a, TTrans, TInEvent, TOutEvent, THandler> fmt::Debug for
500 DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
501where
502 TTrans: Transport,
503 THandler: IntoConnectionHandler,
504{
505 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
506 f.debug_struct("DisconnectedPeer")
507 .field("peer_id", &self.peer_id)
508 .finish()
509 }
510}
511
512impl<'a, TTrans, TInEvent, TOutEvent, THandler>
513 DisconnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>
514where
515 TTrans: Transport,
516 THandler: IntoConnectionHandler,
517{
518 pub fn id(&self) -> &PeerId {
519 &self.peer_id
520 }
521
522 pub fn into_peer(self) -> Peer<'a, TTrans, TInEvent, TOutEvent, THandler> {
524 Peer::Disconnected(self)
525 }
526
527 pub fn set_connected<TMuxer>(
536 self,
537 connected: Connected,
538 connection: Connection<TMuxer, THandler::Handler>,
539 ) -> Result<
540 ConnectedPeer<'a, TTrans, TInEvent, TOutEvent, THandler>,
541 ConnectionLimit
542 > where
543 TInEvent: Send + 'static,
544 TOutEvent: Send + 'static,
545 THandler: Send + 'static,
546 TTrans::Error: Send + 'static,
547 THandler::Handler: ConnectionHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent> + Send,
548 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send,
549 <THandler::Handler as ConnectionHandler>::Error: error::Error + Send + 'static,
550 TMuxer: StreamMuxer + Send + Sync + 'static,
551 TMuxer::OutboundSubstream: Send,
552 {
553 if connected.peer_id != self.peer_id {
554 panic!("Invalid peer ID given: {:?}. Expected: {:?}", connected.peer_id, self.peer_id)
555 }
556
557 self.network.pool.add(connection, connected)
558 .map(move |_id| ConnectedPeer {
559 network: self.network,
560 peer_id: self.peer_id,
561 })
562 }
563}
564
565#[derive(Debug, Clone)]
568pub(super) struct DialingState {
569 pub(super) current: (ConnectionId, Multiaddr),
571 pub(super) remaining: Vec<Multiaddr>,
573}
574
575pub struct DialingAttempt<'a, TInEvent> {
579 inner: PendingConnection<'a, TInEvent>,
581 attempts: hash_map::OccupiedEntry<'a, PeerId, SmallVec<[DialingState; 10]>>,
583 pos: usize,
585}
586
587impl<'a, TInEvent>
588 DialingAttempt<'a, TInEvent>
589{
590 pub fn id(&self) -> ConnectionId {
592 self.inner.id()
593 }
594
595 pub fn peer_id(&self) -> &PeerId {
597 self.attempts.key()
598 }
599
600 pub fn address(&self) -> &Multiaddr {
602 match self.inner.endpoint() {
603 ConnectedPoint::Dialer { address } => address,
604 ConnectedPoint::Listener { .. } => unreachable!("by definition of a `DialingAttempt`.")
605 }
606 }
607
608 pub fn abort(mut self) {
614 self.attempts.get_mut().remove(self.pos);
615 if self.attempts.get().is_empty() {
616 self.attempts.remove();
617 }
618 self.inner.abort();
619 }
620
621 pub fn add_address(&mut self, addr: Multiaddr) {
624 let remaining = &mut self.attempts.get_mut()[self.pos].remaining;
625 if remaining.iter().all(|a| a != &addr) {
626 remaining.push(addr);
627 }
628 }
629}
630
631pub struct DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
633 peer_id: &'a PeerId,
635 pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
637 dialing: &'a mut FnvHashMap<PeerId, SmallVec<[DialingState; 10]>>,
643 pos: usize,
645 end: usize,
647}
648
649impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
653 DialingAttemptIter<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
654{
655 fn new(
656 peer_id: &'a PeerId,
657 pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
658 dialing: &'a mut FnvHashMap<PeerId, SmallVec<[DialingState; 10]>>,
659 ) -> Self {
660 let end = dialing.get(peer_id).map_or(0, |conns| conns.len());
661 Self { pos: 0, end, pool, dialing, peer_id }
662 }
663
664 #[allow(clippy::should_implement_trait)]
666 pub fn next(&mut self) -> Option<DialingAttempt<'_, TInEvent>> {
667 let end = self.dialing.get(self.peer_id).map_or(0, |conns| conns.len());
671 if self.end > end {
672 self.end = end;
673 self.pos -= 1;
674 }
675
676 if self.pos == self.end {
677 return None
678 }
679
680 if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(*self.peer_id) {
681 let id = attempts.get()[self.pos].current.0;
682 if let Some(inner) = self.pool.get_outgoing(id) {
683 let conn = DialingAttempt { pos: self.pos, inner, attempts };
684 self.pos += 1;
685 return Some(conn)
686 }
687 }
688
689 None
690 }
691
692 pub fn into_first<'b>(self)
694 -> Option<DialingAttempt<'b, TInEvent>>
695 where 'a: 'b
696 {
697 if self.pos == self.end {
698 return None
699 }
700
701 if let hash_map::Entry::Occupied(attempts) = self.dialing.entry(*self.peer_id) {
702 let id = attempts.get()[self.pos].current.0;
703 if let Some(inner) = self.pool.get_outgoing(id) {
704 return Some(DialingAttempt { pos: self.pos, inner, attempts })
705 }
706 }
707
708 None
709 }
710}