1use crate::{
22 ConnectedPoint,
23 PeerId,
24 connection::{
25 self,
26 Connected,
27 Connection,
28 ConnectionId,
29 ConnectionLimit,
30 ConnectionError,
31 ConnectionHandler,
32 IncomingInfo,
33 IntoConnectionHandler,
34 OutgoingInfo,
35 Substream,
36 PendingConnectionError,
37 manager::{self, Manager, ManagerConfig},
38 },
39 muxing::StreamMuxer,
40};
41use either::Either;
42use fnv::FnvHashMap;
43use futures::prelude::*;
44use smallvec::SmallVec;
45use std::{convert::TryFrom as _, error, fmt, num::NonZeroU32, task::Context, task::Poll};
46
47pub struct Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
49 local_id: PeerId,
50
51 counters: ConnectionCounters,
53
54 manager: Manager<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
59
60 established: FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
63
64 pending: FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
66
67 disconnected: Vec<Disconnected>,
72}
73
74impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
75for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
76{
77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
78 f.debug_struct("Pool")
79 .field("counters", &self.counters)
80 .finish()
81 }
82}
83
84impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> Unpin
85for Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {}
86
87pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
89 ConnectionEstablished {
91 connection: EstablishedConnection<'a, TInEvent>,
92 num_established: NonZeroU32,
93 },
94
95 ConnectionClosed {
107 id: ConnectionId,
108 connected: Connected,
110 error: Option<ConnectionError<THandlerErr>>,
113 pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
115 num_established: u32,
117 },
118
119 PendingConnectionError {
121 id: ConnectionId,
123 endpoint: ConnectedPoint,
125 error: PendingConnectionError<TTransErr>,
127 handler: Option<THandler>,
130 peer: Option<PeerId>,
132 pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
134 },
135
136 ConnectionEvent {
138 connection: EstablishedConnection<'a, TInEvent>,
140 event: TOutEvent,
142 },
143
144 AddressChange {
146 connection: EstablishedConnection<'a, TInEvent>,
148 new_endpoint: ConnectedPoint,
150 old_endpoint: ConnectedPoint,
152 },
153}
154
155impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> fmt::Debug
156for PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
157where
158 TOutEvent: fmt::Debug,
159 TTransErr: fmt::Debug,
160 THandlerErr: fmt::Debug,
161 TInEvent: fmt::Debug,
162{
163 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
164 match *self {
165 PoolEvent::ConnectionEstablished { ref connection, .. } => {
166 f.debug_tuple("PoolEvent::ConnectionEstablished")
167 .field(connection)
168 .finish()
169 },
170 PoolEvent::ConnectionClosed { ref id, ref connected, ref error, .. } => {
171 f.debug_struct("PoolEvent::ConnectionClosed")
172 .field("id", id)
173 .field("connected", connected)
174 .field("error", error)
175 .finish()
176 },
177 PoolEvent::PendingConnectionError { ref id, ref error, .. } => {
178 f.debug_struct("PoolEvent::PendingConnectionError")
179 .field("id", id)
180 .field("error", error)
181 .finish()
182 },
183 PoolEvent::ConnectionEvent { ref connection, ref event } => {
184 f.debug_struct("PoolEvent::ConnectionEvent")
185 .field("peer", connection.peer_id())
186 .field("event", event)
187 .finish()
188 },
189 PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
190 f.debug_struct("PoolEvent::AddressChange")
191 .field("peer", connection.peer_id())
192 .field("new_endpoint", new_endpoint)
193 .field("old_endpoint", old_endpoint)
194 .finish()
195 },
196 }
197 }
198}
199
200impl<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
201 Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
202{
203 pub fn new(
205 local_id: PeerId,
206 manager_config: ManagerConfig,
207 limits: ConnectionLimits
208 ) -> Self {
209 Pool {
210 local_id,
211 counters: ConnectionCounters::new(limits),
212 manager: Manager::new(manager_config),
213 established: Default::default(),
214 pending: Default::default(),
215 disconnected: Vec::new(),
216 }
217 }
218
219 pub fn counters(&self) -> &ConnectionCounters {
221 &self.counters
222 }
223
224 pub fn add_incoming<TFut, TMuxer>(
230 &mut self,
231 future: TFut,
232 handler: THandler,
233 info: IncomingInfo<'_>,
234 ) -> Result<ConnectionId, ConnectionLimit>
235 where
236 TFut: Future<
237 Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
238 > + Send + 'static,
239 THandler: IntoConnectionHandler + Send + 'static,
240 THandler::Handler: ConnectionHandler<
241 Substream = Substream<TMuxer>,
242 InEvent = TInEvent,
243 OutEvent = TOutEvent,
244 Error = THandlerErr
245 > + Send + 'static,
246 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
247 TTransErr: error::Error + Send + 'static,
248 THandlerErr: error::Error + Send + 'static,
249 TInEvent: Send + 'static,
250 TOutEvent: Send + 'static,
251 TMuxer: StreamMuxer + Send + Sync + 'static,
252 TMuxer::OutboundSubstream: Send + 'static,
253 {
254 self.counters.check_max_pending_incoming()?;
255 let endpoint = info.to_connected_point();
256 Ok(self.add_pending(future, handler, endpoint, None))
257 }
258
259 pub fn add_outgoing<TFut, TMuxer>(
265 &mut self,
266 future: TFut,
267 handler: THandler,
268 info: OutgoingInfo<'_>,
269 ) -> Result<ConnectionId, ConnectionLimit>
270 where
271 TFut: Future<
272 Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
273 > + Send + 'static,
274 THandler: IntoConnectionHandler + Send + 'static,
275 THandler::Handler: ConnectionHandler<
276 Substream = Substream<TMuxer>,
277 InEvent = TInEvent,
278 OutEvent = TOutEvent,
279 Error = THandlerErr
280 > + Send + 'static,
281 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
282 TTransErr: error::Error + Send + 'static,
283 THandlerErr: error::Error + Send + 'static,
284 TInEvent: Send + 'static,
285 TOutEvent: Send + 'static,
286 TMuxer: StreamMuxer + Send + Sync + 'static,
287 TMuxer::OutboundSubstream: Send + 'static,
288 {
289 self.counters.check_max_pending_outgoing()?;
290 let endpoint = info.to_connected_point();
291 Ok(self.add_pending(future, handler, endpoint, info.peer_id.cloned()))
292 }
293
294 fn add_pending<TFut, TMuxer>(
297 &mut self,
298 future: TFut,
299 handler: THandler,
300 endpoint: ConnectedPoint,
301 peer: Option<PeerId>,
302 ) -> ConnectionId
303 where
304 TFut: Future<
305 Output = Result<(PeerId, TMuxer), PendingConnectionError<TTransErr>>
306 > + Send + 'static,
307 THandler: IntoConnectionHandler + Send + 'static,
308 THandler::Handler: ConnectionHandler<
309 Substream = Substream<TMuxer>,
310 InEvent = TInEvent,
311 OutEvent = TOutEvent,
312 Error = THandlerErr
313 > + Send + 'static,
314 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
315 TTransErr: error::Error + Send + 'static,
316 THandlerErr: error::Error + Send + 'static,
317 TInEvent: Send + 'static,
318 TOutEvent: Send + 'static,
319 TMuxer: StreamMuxer + Send + Sync + 'static,
320 TMuxer::OutboundSubstream: Send + 'static,
321 {
322 let future = future.and_then({
327 let endpoint = endpoint.clone();
328 let expected_peer = peer.clone();
329 let local_id = self.local_id.clone();
330 move |(peer_id, muxer)| {
331 if let Some(peer) = expected_peer {
332 if peer != peer_id {
333 return future::err(PendingConnectionError::InvalidPeerId)
334 }
335 }
336
337 if local_id == peer_id {
338 return future::err(PendingConnectionError::InvalidPeerId)
339 }
340
341 let connected = Connected { peer_id, endpoint };
342 future::ready(Ok((connected, muxer)))
343 }
344 });
345
346 let id = self.manager.add_pending(future, handler);
347 self.counters.inc_pending(&endpoint);
348 self.pending.insert(id, (endpoint, peer));
349 id
350 }
351
352 pub fn add<TMuxer>(&mut self, c: Connection<TMuxer, THandler::Handler>, i: Connected)
358 -> Result<ConnectionId, ConnectionLimit>
359 where
360 THandler: IntoConnectionHandler + Send + 'static,
361 THandler::Handler: ConnectionHandler<
362 Substream = connection::Substream<TMuxer>,
363 InEvent = TInEvent,
364 OutEvent = TOutEvent,
365 Error = THandlerErr
366 > + Send + 'static,
367 <THandler::Handler as ConnectionHandler>::OutboundOpenInfo: Send + 'static,
368 TTransErr: error::Error + Send + 'static,
369 THandlerErr: error::Error + Send + 'static,
370 TInEvent: Send + 'static,
371 TOutEvent: Send + 'static,
372 TMuxer: StreamMuxer + Send + Sync + 'static,
373 TMuxer::OutboundSubstream: Send + 'static,
374 {
375 self.counters.check_max_established(&i.endpoint)?;
376 self.counters.check_max_established_per_peer(self.num_peer_established(&i.peer_id))?;
377 let id = self.manager.add(c, i.clone());
378 self.counters.inc_established(&i.endpoint);
379 self.established.entry(i.peer_id.clone()).or_default().insert(id, i.endpoint);
380 Ok(id)
381 }
382
383 pub fn get(&mut self, id: ConnectionId)
387 -> Option<PoolConnection<'_, TInEvent>>
388 {
389 match self.manager.entry(id) {
390 Some(manager::Entry::Established(entry)) =>
391 Some(PoolConnection::Established(EstablishedConnection {
392 entry
393 })),
394 Some(manager::Entry::Pending(entry)) =>
395 Some(PoolConnection::Pending(PendingConnection {
396 entry,
397 pending: &mut self.pending,
398 counters: &mut self.counters,
399 })),
400 None => None
401 }
402 }
403
404 pub fn get_established(&mut self, id: ConnectionId)
406 -> Option<EstablishedConnection<'_, TInEvent>>
407 {
408 match self.get(id) {
409 Some(PoolConnection::Established(c)) => Some(c),
410 _ => None
411 }
412 }
413
414 pub fn get_outgoing(&mut self, id: ConnectionId)
416 -> Option<PendingConnection<'_, TInEvent>>
417 {
418 match self.pending.get(&id) {
419 Some((ConnectedPoint::Dialer { .. }, _peer)) =>
420 match self.manager.entry(id) {
421 Some(manager::Entry::Pending(entry)) =>
422 Some(PendingConnection {
423 entry,
424 pending: &mut self.pending,
425 counters: &mut self.counters,
426 }),
427 _ => unreachable!("by consistency of `self.pending` with `self.manager`")
428 }
429 _ => None
430 }
431 }
432
433 pub fn is_connected(&self, id: &PeerId) -> bool {
437 self.established.contains_key(id)
438 }
439
440 pub fn num_peers(&self) -> usize {
443 self.established.len()
444 }
445
446 pub fn disconnect(&mut self, peer: &PeerId) {
456 if let Some(conns) = self.established.get(peer) {
457 let mut num_established = 0;
459 for (&id, endpoint) in conns.iter() {
460 if let Some(manager::Entry::Established(e)) = self.manager.entry(id) {
461 let connected = e.remove();
462 self.disconnected.push(Disconnected {
463 id, connected, num_established
464 });
465 num_established += 1;
466 }
467 self.counters.dec_established(endpoint);
468 }
469 }
470 self.established.remove(peer);
471
472 let mut aborted = Vec::new();
473 for (&id, (_endpoint, peer2)) in &self.pending {
474 if Some(peer) == peer2.as_ref() {
475 if let Some(manager::Entry::Pending(e)) = self.manager.entry(id) {
476 e.abort();
477 aborted.push(id);
478 }
479 }
480 }
481 for id in aborted {
482 if let Some((endpoint, _)) = self.pending.remove(&id) {
483 self.counters.dec_pending(&endpoint);
484 }
485 }
486 }
487
488 pub fn num_peer_established(&self, peer: &PeerId) -> u32 {
490 num_peer_established(&self.established, peer)
491 }
492
493 pub fn iter_peer_established<'a>(&'a mut self, peer: &PeerId)
495 -> EstablishedConnectionIter<'a,
496 impl Iterator<Item = ConnectionId>,
497 TInEvent,
498 TOutEvent,
499 THandler,
500 TTransErr,
501 THandlerErr>
502 {
503 let ids = self.iter_peer_established_info(peer)
504 .map(|(id, _endpoint)| *id)
505 .collect::<SmallVec<[ConnectionId; 10]>>()
506 .into_iter();
507
508 EstablishedConnectionIter { pool: self, ids }
509 }
510
511 pub fn iter_pending_incoming(&self) -> impl Iterator<Item = IncomingInfo<'_>> {
513 self.iter_pending_info()
514 .filter_map(|(_, ref endpoint, _)| {
515 match endpoint {
516 ConnectedPoint::Listener { local_addr, send_back_addr } => {
517 Some(IncomingInfo { local_addr, send_back_addr })
518 },
519 ConnectedPoint::Dialer { .. } => None,
520 }
521 })
522 }
523
524 pub fn iter_pending_outgoing(&self) -> impl Iterator<Item = OutgoingInfo<'_>> {
526 self.iter_pending_info()
527 .filter_map(|(_, ref endpoint, ref peer_id)| {
528 match endpoint {
529 ConnectedPoint::Listener { .. } => None,
530 ConnectedPoint::Dialer { address } =>
531 Some(OutgoingInfo { address, peer_id: peer_id.as_ref() }),
532 }
533 })
534 }
535
536 pub fn iter_peer_established_info(&self, peer: &PeerId)
539 -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint)> + fmt::Debug + '_
540 {
541 match self.established.get(peer) {
542 Some(conns) => Either::Left(conns.iter()),
543 None => Either::Right(std::iter::empty())
544 }
545 }
546
547 pub fn iter_pending_info(&self)
550 -> impl Iterator<Item = (&ConnectionId, &ConnectedPoint, &Option<PeerId>)> + '_
551 {
552 self.pending.iter().map(|(id, (endpoint, info))| (id, endpoint, info))
553 }
554
555 pub fn iter_connected<'a>(&'a self) -> impl Iterator<Item = &'a PeerId> + 'a {
558 self.established.keys()
559 }
560
561 pub fn poll<'a>(&'a mut self, cx: &mut Context<'_>) -> Poll<
566 PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
567 > {
568 if let Some(Disconnected {
576 id, connected, num_established
577 }) = self.disconnected.pop() {
578 return Poll::Ready(PoolEvent::ConnectionClosed {
579 id,
580 connected,
581 num_established,
582 error: None,
583 pool: self,
584 })
585 }
586
587 loop {
589 let item = match self.manager.poll(cx) {
590 Poll::Ready(item) => item,
591 Poll::Pending => return Poll::Pending,
592 };
593
594 match item {
595 manager::Event::PendingConnectionError { id, error, handler } => {
596 if let Some((endpoint, peer)) = self.pending.remove(&id) {
597 self.counters.dec_pending(&endpoint);
598 return Poll::Ready(PoolEvent::PendingConnectionError {
599 id,
600 endpoint,
601 error,
602 handler: Some(handler),
603 peer,
604 pool: self
605 })
606 }
607 },
608 manager::Event::ConnectionClosed { id, connected, error } => {
609 let num_established =
610 if let Some(conns) = self.established.get_mut(&connected.peer_id) {
611 if let Some(endpoint) = conns.remove(&id) {
612 self.counters.dec_established(&endpoint);
613 }
614 u32::try_from(conns.len()).unwrap()
615 } else {
616 0
617 };
618 if num_established == 0 {
619 self.established.remove(&connected.peer_id);
620 }
621 return Poll::Ready(PoolEvent::ConnectionClosed {
622 id, connected, error, num_established, pool: self
623 })
624 }
625 manager::Event::ConnectionEstablished { entry } => {
626 let id = entry.id();
627 if let Some((endpoint, peer)) = self.pending.remove(&id) {
628 self.counters.dec_pending(&endpoint);
629
630 if let Err(e) = self.counters.check_max_established(&endpoint) {
632 let connected = entry.remove();
633 return Poll::Ready(PoolEvent::PendingConnectionError {
634 id,
635 endpoint: connected.endpoint,
636 error: PendingConnectionError::ConnectionLimit(e),
637 handler: None,
638 peer,
639 pool: self
640 })
641 }
642
643 let current = num_peer_established(&self.established, &entry.connected().peer_id);
645 if let Err(e) = self.counters.check_max_established_per_peer(current) {
646 let connected = entry.remove();
647 return Poll::Ready(PoolEvent::PendingConnectionError {
648 id,
649 endpoint: connected.endpoint,
650 error: PendingConnectionError::ConnectionLimit(e),
651 handler: None,
652 peer,
653 pool: self
654 })
655 }
656
657 if cfg!(debug_assertions) {
659 if self.local_id == entry.connected().peer_id {
660 panic!("Unexpected local peer ID for remote.");
661 }
662 if let Some(peer) = peer {
663 if peer != entry.connected().peer_id {
664 panic!("Unexpected peer ID mismatch.");
665 }
666 }
667 }
668
669 let peer = entry.connected().peer_id.clone();
671 let conns = self.established.entry(peer).or_default();
672 let num_established = NonZeroU32::new(u32::try_from(conns.len() + 1).unwrap())
673 .expect("n + 1 is always non-zero; qed");
674 self.counters.inc_established(&endpoint);
675 conns.insert(id, endpoint);
676 match self.get(id) {
677 Some(PoolConnection::Established(connection)) =>
678 return Poll::Ready(PoolEvent::ConnectionEstablished {
679 connection, num_established
680 }),
681 _ => unreachable!("since `entry` is an `EstablishedEntry`.")
682 }
683 }
684 },
685 manager::Event::ConnectionEvent { entry, event } => {
686 let id = entry.id();
687 match self.get(id) {
688 Some(PoolConnection::Established(connection)) =>
689 return Poll::Ready(PoolEvent::ConnectionEvent {
690 connection,
691 event,
692 }),
693 _ => unreachable!("since `entry` is an `EstablishedEntry`.")
694 }
695 },
696 manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
697 let id = entry.id();
698
699 match self.established.get_mut(&entry.connected().peer_id) {
700 Some(list) => *list.get_mut(&id)
701 .expect("state inconsistency: entry is `EstablishedEntry` but absent \
702 from `established`") = new_endpoint.clone(),
703 None => unreachable!("since `entry` is an `EstablishedEntry`.")
704 };
705
706 match self.get(id) {
707 Some(PoolConnection::Established(connection)) =>
708 return Poll::Ready(PoolEvent::AddressChange {
709 connection,
710 new_endpoint,
711 old_endpoint,
712 }),
713 _ => unreachable!("since `entry` is an `EstablishedEntry`.")
714 }
715 },
716 }
717 }
718 }
719
720}
721
722pub enum PoolConnection<'a, TInEvent> {
724 Pending(PendingConnection<'a, TInEvent>),
725 Established(EstablishedConnection<'a, TInEvent>),
726}
727
728pub struct PendingConnection<'a, TInEvent> {
730 entry: manager::PendingEntry<'a, TInEvent>,
731 pending: &'a mut FnvHashMap<ConnectionId, (ConnectedPoint, Option<PeerId>)>,
732 counters: &'a mut ConnectionCounters,
733}
734
735impl<TInEvent>
736 PendingConnection<'_, TInEvent>
737{
738 pub fn id(&self) -> ConnectionId {
740 self.entry.id()
741 }
742
743 pub fn peer_id(&self) -> &Option<PeerId> {
745 &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").1
746 }
747
748 pub fn endpoint(&self) -> &ConnectedPoint {
750 &self.pending.get(&self.entry.id()).expect("`entry` is a pending entry").0
751 }
752
753 pub fn abort(self) {
755 let endpoint = self.pending.remove(&self.entry.id()).expect("`entry` is a pending entry").0;
756 self.counters.dec_pending(&endpoint);
757 self.entry.abort();
758 }
759}
760
761pub struct EstablishedConnection<'a, TInEvent> {
763 entry: manager::EstablishedEntry<'a, TInEvent>,
764}
765
766impl<TInEvent> fmt::Debug
767for EstablishedConnection<'_, TInEvent>
768where
769 TInEvent: fmt::Debug,
770{
771 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
772 f.debug_struct("EstablishedConnection")
773 .field("entry", &self.entry)
774 .finish()
775 }
776}
777
778impl<TInEvent> EstablishedConnection<'_, TInEvent> {
779 pub fn connected(&self) -> &Connected {
780 self.entry.connected()
781 }
782
783 pub fn endpoint(&self) -> &ConnectedPoint {
785 &self.entry.connected().endpoint
786 }
787
788 pub fn peer_id(&self) -> &PeerId {
790 &self.entry.connected().peer_id
791 }
792
793 pub fn id(&self) -> ConnectionId {
795 self.entry.id()
796 }
797
798 pub fn notify_handler(&mut self, event: TInEvent) -> Result<(), TInEvent> {
809 self.entry.notify_handler(event)
810 }
811
812 pub fn poll_ready_notify_handler(&mut self, cx: &mut Context<'_>) -> Poll<Result<(),()>> {
819 self.entry.poll_ready_notify_handler(cx)
820 }
821
822 pub fn start_close(self) {
826 self.entry.start_close()
827 }
828}
829
830pub struct EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr> {
832 pool: &'a mut Pool<TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>,
833 ids: I
834}
835
836impl<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
840 EstablishedConnectionIter<'a, I, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr>
841where
842 I: Iterator<Item = ConnectionId>
843{
844 pub fn next(&mut self) -> Option<EstablishedConnection<'_, TInEvent>>
846 {
847 while let Some(id) = self.ids.next() {
848 if self.pool.manager.is_established(&id) { match self.pool.manager.entry(id) {
850 Some(manager::Entry::Established(entry)) => {
851 return Some(EstablishedConnection { entry })
852 }
853 _ => panic!("Established entry not found in manager.") }
855 }
856 }
857 None
858 }
859
860 pub fn into_ids(self) -> impl Iterator<Item = ConnectionId> {
862 self.ids
863 }
864
865 pub fn into_first<'b>(mut self)
867 -> Option<EstablishedConnection<'b, TInEvent>>
868 where 'a: 'b
869 {
870 while let Some(id) = self.ids.next() {
871 if self.pool.manager.is_established(&id) { match self.pool.manager.entry(id) {
873 Some(manager::Entry::Established(entry)) => {
874 return Some(EstablishedConnection { entry })
875 }
876 _ => panic!("Established entry not found in manager.") }
878 }
879 }
880 None
881 }
882}
883
884#[derive(Debug, Clone)]
886pub struct ConnectionCounters {
887 limits: ConnectionLimits,
889 pending_incoming: u32,
891 pending_outgoing: u32,
893 established_incoming: u32,
895 established_outgoing: u32,
897}
898
899impl ConnectionCounters {
900 fn new(limits: ConnectionLimits) -> Self {
901 Self {
902 limits,
903 pending_incoming: 0,
904 pending_outgoing: 0,
905 established_incoming: 0,
906 established_outgoing: 0,
907 }
908 }
909
910 pub fn limits(&self) -> &ConnectionLimits {
912 &self.limits
913 }
914
915 pub fn num_connections(&self) -> u32 {
917 self.num_pending() + self.num_established()
918 }
919
920 pub fn num_pending(&self) -> u32 {
922 self.pending_incoming + self.pending_outgoing
923 }
924
925 pub fn num_pending_incoming(&self) -> u32 {
927 self.pending_incoming
928 }
929
930 pub fn num_pending_outgoing(&self) -> u32 {
932 self.pending_outgoing
933 }
934
935 pub fn num_established_incoming(&self) -> u32 {
937 self.established_incoming
938 }
939
940 pub fn num_established_outgoing(&self) -> u32 {
942 self.established_outgoing
943 }
944
945 pub fn num_established(&self) -> u32 {
947 self.established_outgoing + self.established_incoming
948 }
949
950 fn inc_pending(&mut self, endpoint: &ConnectedPoint) {
951 match endpoint {
952 ConnectedPoint::Dialer { .. } => { self.pending_outgoing += 1; }
953 ConnectedPoint::Listener { .. } => { self.pending_incoming += 1; }
954 }
955 }
956
957 fn dec_pending(&mut self, endpoint: &ConnectedPoint) {
958 match endpoint {
959 ConnectedPoint::Dialer { .. } => { self.pending_outgoing -= 1; }
960 ConnectedPoint::Listener { .. } => { self.pending_incoming -= 1; }
961 }
962 }
963
964 fn inc_established(&mut self, endpoint: &ConnectedPoint) {
965 match endpoint {
966 ConnectedPoint::Dialer { .. } => { self.established_outgoing += 1; }
967 ConnectedPoint::Listener { .. } => { self.established_incoming += 1; }
968 }
969 }
970
971 fn dec_established(&mut self, endpoint: &ConnectedPoint) {
972 match endpoint {
973 ConnectedPoint::Dialer { .. } => { self.established_outgoing -= 1; }
974 ConnectedPoint::Listener { .. } => { self.established_incoming -= 1; }
975 }
976 }
977
978 fn check_max_pending_outgoing(&self) -> Result<(), ConnectionLimit> {
979 Self::check(self.pending_outgoing, self.limits.max_pending_outgoing)
980 }
981
982 fn check_max_pending_incoming(&self) -> Result<(), ConnectionLimit> {
983 Self::check(self.pending_incoming, self.limits.max_pending_incoming)
984 }
985
986 fn check_max_established(&self, endpoint: &ConnectedPoint)
987 -> Result<(), ConnectionLimit>
988 {
989 match endpoint {
990 ConnectedPoint::Dialer { .. } =>
991 Self::check(self.established_outgoing, self.limits.max_established_outgoing),
992 ConnectedPoint::Listener { .. } => {
993 Self::check(self.established_incoming, self.limits.max_established_incoming)
994 }
995 }
996 }
997
998 fn check_max_established_per_peer(&self, current: u32) -> Result<(), ConnectionLimit> {
999 Self::check(current, self.limits.max_established_per_peer)
1000 }
1001
1002 fn check(current: u32, limit: Option<u32>) -> Result<(), ConnectionLimit> {
1003 if let Some(limit) = limit {
1004 if current >= limit {
1005 return Err(ConnectionLimit { limit, current })
1006 }
1007 }
1008 Ok(())
1009 }
1010
1011}
1012
1013fn num_peer_established(
1015 established: &FnvHashMap<PeerId, FnvHashMap<ConnectionId, ConnectedPoint>>,
1016 peer: &PeerId
1017) -> u32 {
1018 established.get(peer).map_or(0, |conns|
1019 u32::try_from(conns.len())
1020 .expect("Unexpectedly large number of connections for a peer."))
1021}
1022
1023#[derive(Debug, Clone, Default)]
1027pub struct ConnectionLimits {
1028 max_pending_incoming: Option<u32>,
1029 max_pending_outgoing: Option<u32>,
1030 max_established_incoming: Option<u32>,
1031 max_established_outgoing: Option<u32>,
1032 max_established_per_peer: Option<u32>,
1033}
1034
1035impl ConnectionLimits {
1036 pub fn with_max_pending_incoming(mut self, limit: Option<u32>) -> Self {
1038 self.max_pending_incoming = limit;
1039 self
1040 }
1041
1042 pub fn with_max_pending_outgoing(mut self, limit: Option<u32>) -> Self {
1044 self.max_pending_outgoing = limit;
1045 self
1046 }
1047
1048 pub fn with_max_established_incoming(mut self, limit: Option<u32>) -> Self {
1050 self.max_established_incoming = limit;
1051 self
1052 }
1053
1054 pub fn with_max_established_outgoing(mut self, limit: Option<u32>) -> Self {
1056 self.max_established_outgoing = limit;
1057 self
1058 }
1059
1060 pub fn with_max_established_per_peer(mut self, limit: Option<u32>) -> Self {
1063 self.max_established_per_peer = limit;
1064 self
1065 }
1066}
1067
1068struct Disconnected {
1071 id: ConnectionId,
1073 connected: Connected,
1075 num_established: u32,
1078}