libp2p_relay_manager/
lib.rs

1mod handler;
2
3use std::{
4    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
5    error::Error,
6    hash::Hash,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::StreamExt;
12use futures_timer::Delay;
13use libp2p::core::transport::PortUse;
14use libp2p::{
15    core::Endpoint,
16    multiaddr::Protocol,
17    swarm::{
18        derive_prelude::{ConnectionEstablished, ListenerId},
19        dial_opts::DialOpts,
20        AddressChange, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure,
21        ExpiredListenAddr, FromSwarm, ListenOpts, ListenerClosed, ListenerError, NetworkBehaviour,
22        NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23    },
24    Multiaddr, PeerId,
25};
26use rand::seq::SliceRandom;
27
28#[derive(Debug)]
29pub enum Event {
30    ReservationSuccessful {
31        peer_id: PeerId,
32        initial_addr: Multiaddr,
33    },
34    ReservationClosed {
35        peer_id: PeerId,
36        result: Result<(), Box<dyn Error + Send>>,
37    },
38    ReservationFailure {
39        peer_id: PeerId,
40        result: Box<dyn Error + Send>,
41    },
42    FindRelays {
43        /// Namespace of where to locate possible relay candidates
44        namespace: Option<String>,
45        /// Channel to send peer ids of the candidates
46        channel: futures::channel::mpsc::Sender<HashSet<PeerId>>,
47    },
48}
49
50#[derive(Debug, Clone)]
51struct Connection {
52    pub peer_id: PeerId,
53    pub id: ConnectionId,
54    pub addr: Multiaddr,
55    pub candidacy: Candidate,
56    pub rtt: Option<[Duration; 3]>,
57}
58
59#[derive(Debug, Clone)]
60enum Candidate {
61    Pending,
62    Unsupported,
63    Confirmed {
64        listener_id: Option<ListenerId>,
65        addresses: Vec<Multiaddr>,
66    },
67}
68
69impl PartialEq for Connection {
70    fn eq(&self, other: &Self) -> bool {
71        self.id.eq(&other.id)
72    }
73}
74
75impl Eq for Connection {}
76
77#[allow(dead_code)]
78#[derive(Clone, Debug, PartialEq, Eq)]
79struct PendingReservation {
80    peer_id: PeerId,
81    connection_id: ConnectionId,
82    listener_id: ListenerId,
83}
84
85impl Hash for PendingReservation {
86    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
87        self.listener_id.hash(state)
88    }
89}
90
91#[derive(Debug)]
92#[allow(dead_code)]
93enum ReconnectState {
94    Idle {
95        backoff: bool,
96        delay: Delay,
97    },
98    Pending {
99        connection_id: ConnectionId,
100        backoff: bool,
101    },
102}
103
104#[derive(Default, Debug)]
105#[allow(dead_code)]
106pub struct Behaviour {
107    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
108    relays: HashMap<PeerId, Vec<Multiaddr>>,
109    connections: HashMap<PeerId, Vec<Connection>>,
110
111    reconnect: HashMap<PeerId, ReconnectState>,
112
113    discovery_channel: HashMap<u64, futures::channel::mpsc::Receiver<HashSet<PeerId>>>,
114
115    pending_connection: HashSet<ConnectionId>,
116    pending_selection: HashSet<PeerId>,
117    config: Config,
118}
119
120#[derive(Debug, Default)]
121pub struct Config {
122    /// Automatically add confirmed connections to the relay list
123    pub auto_relay: bool,
124
125    /// Automatically connect to peers that are added
126    pub auto_connect: bool,
127
128    /// Min data limit for relay reservation. Anything under this value would reject the relay reservation
129    pub limit: Option<u64>,
130
131    /// Retry relay connection
132    pub backoff: Duration,
133}
134
135impl Behaviour {
136    pub fn new(config: Config) -> Behaviour {
137        Self {
138            config,
139            events: VecDeque::default(),
140            relays: HashMap::default(),
141            connections: HashMap::default(),
142            reconnect: HashMap::default(),
143            discovery_channel: HashMap::default(),
144            pending_connection: HashSet::default(),
145            pending_selection: HashSet::default(),
146        }
147    }
148
149    pub fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
150        match self.relays.entry(peer_id) {
151            Entry::Vacant(entry) => {
152                entry.insert(vec![addr.clone()]);
153            }
154            Entry::Occupied(mut entry) => {
155                let list = entry.get_mut();
156                if list.contains(&addr) {
157                    return;
158                }
159                list.push(addr.clone());
160            }
161        }
162        if self.config.auto_connect {
163            if let Entry::Occupied(entry) = self.connections.entry(peer_id) {
164                if entry.get().iter().any(|connection| connection.addr == addr) {
165                    return;
166                }
167            }
168
169            let opts = DialOpts::peer_id(peer_id).build();
170            self.events.push_back(ToSwarm::Dial { opts })
171        }
172    }
173
174    pub fn remove_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
175        if let Entry::Occupied(mut entry) = self.relays.entry(peer_id) {
176            let list = entry.get_mut();
177
178            if let Some(connection) = self.connections.get(&peer_id).and_then(|connections| {
179                connections
180                    .iter()
181                    .find(|connection| connection.addr.eq(&addr))
182            }) {
183                if let Candidate::Confirmed {
184                    listener_id: Some(id),
185                    ..
186                } = connection.candidacy
187                {
188                    self.events.push_back(ToSwarm::RemoveListener { id });
189                }
190            }
191
192            list.retain(|inner_addr| addr.ne(inner_addr));
193            if list.is_empty() {
194                entry.remove();
195            }
196        }
197    }
198
199    pub fn list_relays(&self) -> impl Iterator<Item = (&PeerId, &Vec<Multiaddr>)> {
200        self.relays.iter()
201    }
202
203    pub fn list_active_relays(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
204        self.connections
205            .iter()
206            .filter(|(_, connections)| {
207                connections.iter().any(|connection| {
208                    matches!(
209                        connection.candidacy,
210                        Candidate::Confirmed {
211                            listener_id: Some(_),
212                            ..
213                        }
214                    )
215                })
216            })
217            .map(|(peer_id, connections)| {
218                (
219                    *peer_id,
220                    connections
221                        .iter()
222                        .map(|connection| &connection.addr)
223                        .cloned()
224                        .collect::<Vec<_>>(),
225                )
226            })
227            .collect()
228    }
229
230    #[allow(dead_code)]
231    fn avg_rtt(&self, connection: &Connection) -> u128 {
232        let rtts = connection.rtt.unwrap_or_default();
233        let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
234        // used in case we cant produce a full avg
235        let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
236        avg / div
237    }
238
239    pub fn select(&mut self, peer_id: PeerId) {
240        if !self.relays.contains_key(&peer_id) {
241            self.events
242                .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
243                    peer_id,
244                    result: Box::new(std::io::Error::new(
245                        std::io::ErrorKind::Other,
246                        "Peer is not added in relay list",
247                    )),
248                }));
249            return;
250        }
251
252        if self.pending_selection.contains(&peer_id) {
253            return;
254        }
255
256        if !self.connections.contains_key(&peer_id) {
257            let opts = DialOpts::peer_id(peer_id).build();
258            let id = opts.connection_id();
259            self.pending_connection.insert(id);
260            self.events.push_back(ToSwarm::Dial { opts });
261            self.pending_selection.insert(peer_id);
262            return;
263        }
264
265        let connections = match self.connections.get_mut(&peer_id) {
266            Some(conns) => conns,
267            None => return,
268        };
269
270        if connections.is_empty() {
271            return;
272        }
273
274        let mut temp_connections = connections.clone();
275        let mut rng = rand::thread_rng();
276        let connection = loop {
277            if temp_connections.is_empty() {
278                self.events
279                    .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
280                        peer_id,
281                        result: Box::new(std::io::Error::new(
282                            std::io::ErrorKind::Other,
283                            "no qualified connections available",
284                        )),
285                    }));
286                return;
287            }
288
289            let connection = temp_connections
290                .choose(&mut rng)
291                .cloned()
292                .expect("Connection available");
293
294            if let Candidate::Confirmed {
295                listener_id: Some(_),
296                ..
297            } = connection.candidacy
298            {
299                // If the candidate is confirmed, remove it from the temporary connection
300                // and try another random connection, if any is available
301                temp_connections.retain(|c| c.id != connection.id);
302                continue;
303            }
304
305            break connections
306                .iter_mut()
307                .find(|c| c.id == connection.id)
308                .expect("Connection available");
309        };
310
311        if matches!(connection.candidacy, Candidate::Pending) {
312            self.pending_selection.insert(peer_id);
313            return;
314        }
315
316        let relay_addr = connection.addr.clone().with(Protocol::P2pCircuit);
317
318        let opts = ListenOpts::new(relay_addr);
319
320        let id = opts.listener_id();
321
322        if let Candidate::Confirmed { listener_id, .. } = &mut connection.candidacy {
323            *listener_id = Some(id);
324        }
325
326        self.events.push_back(ToSwarm::ListenOn { opts });
327    }
328
329    pub fn random_select(&mut self) -> Option<PeerId> {
330        let relay_peers = self.relays.keys().copied().collect::<Vec<_>>();
331        if relay_peers.is_empty() {
332            return None;
333        }
334
335        let mut rng = rand::thread_rng();
336
337        let peer_id = relay_peers.choose(&mut rng)?;
338
339        self.select(*peer_id);
340
341        Some(*peer_id)
342    }
343
344    pub fn disable_relay(&mut self, peer_id: PeerId) {
345        for connection in self
346            .connections
347            .iter()
348            .filter(|(peer, _)| peer_id == **peer)
349            .flat_map(|(_, connections)| connections)
350        {
351            if let Candidate::Confirmed { .. } = connection.candidacy {
352                //TODO: Use ListenerId instead of closing a connection
353                let connection = libp2p::swarm::CloseConnection::One(connection.id);
354                self.events.push_back(ToSwarm::CloseConnection {
355                    peer_id,
356                    connection,
357                });
358            }
359        }
360    }
361
362    pub fn set_peer_rtt(&mut self, peer_id: PeerId, connection_id: ConnectionId, rtt: Duration) {
363        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
364            let connections = entry.get_mut();
365            if let Some(connection) = connections
366                .iter_mut()
367                .find(|connection| connection.id == connection_id)
368            {
369                match connection.rtt.as_mut() {
370                    Some(connection_rtt) => {
371                        connection_rtt.rotate_left(1);
372                        connection_rtt[2] = rtt;
373                    }
374                    None => connection.rtt = Some([Duration::ZERO, Duration::ZERO, rtt]),
375                }
376            }
377        }
378    }
379
380    fn on_listen_on(
381        &mut self,
382        NewListenAddr {
383            listener_id,
384            addr: direct_addr,
385        }: NewListenAddr,
386    ) {
387        if !direct_addr
388            .iter()
389            .any(|proto| matches!(proto, Protocol::P2pCircuit))
390        {
391            return;
392        }
393
394        for connection in self
395            .connections
396            .values_mut()
397            .flatten()
398            .filter(|connection| {
399                if let Candidate::Confirmed {
400                    listener_id: Some(id),
401                    ..
402                } = connection.candidacy
403                {
404                    id == listener_id
405                } else {
406                    false
407                }
408            })
409        {
410            match &mut connection.candidacy {
411                Candidate::Confirmed {
412                    listener_id: id,
413                    addresses,
414                } => {
415                    *id = Some(listener_id);
416                    let first = addresses.is_empty();
417                    if !addresses.contains(direct_addr) {
418                        addresses.push(direct_addr.clone());
419                        if first {
420                            self.events.push_back(ToSwarm::GenerateEvent(
421                                Event::ReservationSuccessful {
422                                    peer_id: connection.peer_id,
423                                    initial_addr: direct_addr.clone(),
424                                },
425                            ))
426                        }
427                    }
428                }
429                Candidate::Pending | Candidate::Unsupported => {
430                    // Maybe panic if we reach this clause?
431                }
432            };
433        }
434    }
435
436    fn on_listener_close(
437        &mut self,
438        ListenerClosed {
439            listener_id,
440            reason,
441        }: ListenerClosed,
442    ) {
443        let Some(connection) =
444            self.connections
445                .values_mut()
446                .flatten()
447                .find(|connection| match connection.candidacy {
448                    Candidate::Confirmed {
449                        listener_id: Some(id),
450                        ..
451                    } => id == listener_id,
452                    _ => false,
453                })
454        else {
455            return;
456        };
457
458        if let Candidate::Confirmed {
459            listener_id,
460            addresses,
461        } = &mut connection.candidacy
462        {
463            listener_id.take();
464            let addrs = std::mem::take(addresses);
465            let has_addresses = addrs.is_empty();
466
467            for addr in addrs {
468                self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
469            }
470
471            match (has_addresses, reason) {
472                (true, result) => {
473                    self.events
474                        .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
475                            peer_id: connection.peer_id,
476                            result: result
477                                .map_err(|e| std::io::Error::new(e.kind(), e.to_string()))
478                                .map_err(|e| Box::new(e) as Box<_>),
479                        }))
480                }
481                (false, Err(e)) => {
482                    self.events
483                        .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
484                            peer_id: connection.peer_id,
485                            result: Box::new(std::io::Error::new(e.kind(), e.to_string())),
486                        }))
487                }
488                _ => {}
489            }
490        }
491    }
492
493    fn on_listener_error(&mut self, ListenerError { listener_id, err }: ListenerError) {
494        let Some(connection) =
495            self.connections
496                .values_mut()
497                .flatten()
498                .find(|connection| match connection.candidacy {
499                    Candidate::Confirmed {
500                        listener_id: Some(id),
501                        ..
502                    } => id == listener_id,
503                    _ => false,
504                })
505        else {
506            return;
507        };
508
509        if let Candidate::Confirmed {
510            listener_id,
511            addresses,
512        } = &mut connection.candidacy
513        {
514            listener_id.take();
515            let addrs = std::mem::take(addresses);
516            for addr in addrs {
517                self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
518            }
519            self.events
520                .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
521                    peer_id: connection.peer_id,
522                    result: Box::new(std::io::Error::new(
523                        std::io::ErrorKind::Other,
524                        err.to_string(),
525                    )),
526                }))
527        }
528    }
529
530    fn on_listener_expired(&mut self, ExpiredListenAddr { listener_id, addr }: ExpiredListenAddr) {
531        let Some(connection) =
532            self.connections
533                .values_mut()
534                .flatten()
535                .find(|connection| match connection.candidacy {
536                    Candidate::Confirmed {
537                        listener_id: Some(id),
538                        ..
539                    } => id == listener_id,
540                    _ => false,
541                })
542        else {
543            return;
544        };
545
546        if let Candidate::Confirmed { addresses, .. } = &mut connection.candidacy {
547            if !addresses.contains(addr) {
548                return;
549            }
550
551            addresses.retain(|a| a != addr);
552
553            self.events
554                .push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
555        }
556    }
557
558    fn on_address_change(
559        &mut self,
560        AddressChange {
561            peer_id,
562            connection_id,
563            old,
564            new,
565        }: AddressChange,
566    ) {
567        let Some(connections) = self.connections.get_mut(&peer_id) else {
568            return;
569        };
570
571        let Some(connection) = connections
572            .iter_mut()
573            .find(|connection| connection.id == connection_id)
574        else {
575            return;
576        };
577
578        let old_addr = match old {
579            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
580            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
581        };
582
583        let new_addr = match new {
584            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
585            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
586        };
587
588        if old_addr == new_addr {
589            return;
590        }
591
592        connection.addr = new_addr.clone();
593    }
594
595    fn on_dial_failure(
596        &mut self,
597        DialFailure {
598            peer_id,
599            error,
600            connection_id,
601        }: DialFailure,
602    ) {
603        if !self.pending_connection.remove(&connection_id) {
604            return;
605        }
606
607        let Some(peer_id) = peer_id else {
608            return;
609        };
610
611        self.events
612            .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
613                peer_id,
614                result: Box::new(std::io::Error::new(
615                    std::io::ErrorKind::Other,
616                    error.to_string(),
617                )),
618            }));
619
620        //TODO: perform checks and do a reconnect attempt
621
622        // let Some(peer_id) = peer_id else {
623        //     return;
624        // };
625
626        // match error {
627        //     libp2p::swarm::DialError::LocalPeerId { .. } => {
628        //         self.relays.remove(&peer_id);
629        //     }
630        //     libp2p::swarm::DialError::NoAddresses => {
631        //         self.relays.remove(&peer_id);
632        //     }
633        //     libp2p::swarm::DialError::DialPeerConditionFalse(_) => {}
634        //     libp2p::swarm::DialError::Aborted => {}
635        //     libp2p::swarm::DialError::WrongPeerId { obtained, endpoint } => {}
636        //     libp2p::swarm::DialError::Denied { cause } => {}
637        //     libp2p::swarm::DialError::Transport(_) => {}
638        // }
639    }
640
641    fn on_connection_established(
642        &mut self,
643        ConnectionEstablished {
644            peer_id,
645            connection_id,
646            endpoint,
647            ..
648        }: ConnectionEstablished,
649    ) {
650        self.pending_connection.remove(&connection_id);
651
652        let addr = match endpoint {
653            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
654            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
655        };
656
657        match self.relays.entry(peer_id) {
658            Entry::Occupied(entry) => {
659                let mut addr = addr.clone();
660                addr.pop();
661
662                if !entry.get().contains(&addr) {
663                    return;
664                }
665            }
666            Entry::Vacant(_) if self.config.auto_connect => {}
667            _ => return,
668        };
669
670        let connection = Connection {
671            peer_id,
672            id: connection_id,
673            addr: addr.clone(),
674            candidacy: Candidate::Pending,
675            rtt: None,
676        };
677
678        self.connections
679            .entry(peer_id)
680            .or_default()
681            .push(connection);
682
683        if self.pending_selection.remove(&peer_id) {
684            self.select(peer_id);
685        }
686    }
687
688    fn on_connection_closed(
689        &mut self,
690        ConnectionClosed {
691            peer_id,
692            connection_id,
693            ..
694        }: ConnectionClosed<'_>,
695    ) {
696        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
697            let connections = entry.get_mut();
698            let Some(connection) = connections
699                .iter_mut()
700                .find(|connection| connection.id == connection_id)
701            else {
702                return;
703            };
704
705            //Note: If the listener has been closed, then this condition may not happen
706            //      but is set as a precaution
707            //TODO: Confirm that the order is consistent if the relay is removed
708            if let Candidate::Confirmed {
709                listener_id,
710                addresses,
711            } = &mut connection.candidacy
712            {
713                if let Some(listener_id) = listener_id.take() {
714                    let addrs = std::mem::take(addresses);
715                    for addr in addrs {
716                        self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
717                    }
718                    self.events
719                        .push_back(ToSwarm::RemoveListener { id: listener_id });
720
721                    self.events
722                        .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
723                            peer_id: connection.peer_id,
724                            result: Ok(()),
725                        }))
726                }
727            }
728
729            connections.retain(|connection| connection.id != connection_id);
730
731            if connections.is_empty() {
732                entry.remove();
733            }
734        }
735    }
736
737    pub fn process_relay_event(&mut self, _: libp2p::relay::client::Event) {
738        //TODO: Perform checks on limit reported from the reservation and either accept it or
739        //      disconnect and attempt a different connection to a relay with a higher
740        //      limit requirement
741        //NOTE: This is helpful if one knows that the relays will have a higher limit, otherwise
742        //      this may cause long waits when attempting to find relays with higher limits
743        //      for the reservation
744        // match event {
745        //     libp2p::relay::client::Event::ReservationReqAccepted { .. } => {}
746        //     _ => {}
747        // }
748    }
749}
750
751impl NetworkBehaviour for Behaviour {
752    type ToSwarm = Event;
753    type ConnectionHandler = handler::Handler;
754
755    fn handle_established_inbound_connection(
756        &mut self,
757        _connection_id: ConnectionId,
758        _peer: PeerId,
759        _local_addr: &Multiaddr,
760        _remote_addr: &Multiaddr,
761    ) -> Result<THandler<Self>, ConnectionDenied> {
762        Ok(handler::Handler::default())
763    }
764
765    fn handle_established_outbound_connection(
766        &mut self,
767        _connection_id: ConnectionId,
768        _peer: PeerId,
769        _addr: &Multiaddr,
770        _role_override: Endpoint,
771        _: PortUse,
772    ) -> Result<THandler<Self>, ConnectionDenied> {
773        Ok(handler::Handler::default())
774    }
775
776    fn handle_pending_outbound_connection(
777        &mut self,
778        _: ConnectionId,
779        maybe_peer: Option<PeerId>,
780        _: &[Multiaddr],
781        _: Endpoint,
782    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
783        let addrs = maybe_peer
784            .and_then(|peer_id| self.relays.get(&peer_id))
785            .cloned()
786            .unwrap_or_default();
787
788        Ok(addrs)
789    }
790
791    fn on_swarm_event(&mut self, event: FromSwarm) {
792        match event {
793            FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
794            FromSwarm::ConnectionClosed(event) => self.on_connection_closed(event),
795            FromSwarm::NewListenAddr(event) => self.on_listen_on(event),
796            FromSwarm::ListenerClosed(event) => self.on_listener_close(event),
797            FromSwarm::DialFailure(event) => self.on_dial_failure(event),
798            FromSwarm::ListenerError(event) => self.on_listener_error(event),
799            FromSwarm::ExpiredListenAddr(event) => self.on_listener_expired(event),
800            FromSwarm::AddressChange(event) => self.on_address_change(event),
801            _ => {}
802        }
803    }
804
805    fn on_connection_handler_event(
806        &mut self,
807        peer_id: PeerId,
808        connection_id: ConnectionId,
809        event: THandlerOutEvent<Self>,
810    ) {
811        match event {
812            handler::Out::Supported => {
813                if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
814                    let list = entry.get_mut();
815                    if let Some(connection) = list
816                        .iter_mut()
817                        .find(|connection| connection.id == connection_id)
818                    {
819                        let canadate_state = &mut connection.candidacy;
820
821                        if matches!(canadate_state, Candidate::Pending | Candidate::Unsupported) {
822                            *canadate_state = Candidate::Confirmed {
823                                listener_id: None,
824                                addresses: vec![],
825                            };
826                            if self.pending_selection.remove(&peer_id) {
827                                self.select(peer_id);
828                            }
829                        }
830                    }
831                }
832            }
833            handler::Out::Unsupported => {
834                if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
835                    let list = entry.get_mut();
836                    if let Some(connection) = list
837                        .iter_mut()
838                        .find(|connection| connection.id == connection_id)
839                    {
840                        let canadate_state = &mut connection.candidacy;
841
842                        if let Candidate::Confirmed {
843                            listener_id: Some(id),
844                            ..
845                        } = canadate_state
846                        {
847                            let id = *id;
848                            self.events.push_back(ToSwarm::RemoveListener { id });
849                        }
850
851                        *canadate_state = Candidate::Unsupported;
852                        self.pending_selection.remove(&peer_id);
853                    }
854                }
855            }
856        }
857    }
858
859    fn poll(
860        &mut self,
861        cx: &mut Context<'_>,
862    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
863        if let Some(event) = self.events.pop_front() {
864            return Poll::Ready(event);
865        }
866
867        self.discovery_channel
868            .retain(|_, rx| match rx.poll_next_unpin(cx) {
869                Poll::Ready(Some(list)) => {
870                    for peer_id in list {
871                        self.relays.entry(peer_id).or_default();
872                    }
873                    false
874                }
875                Poll::Ready(None) => false,
876                Poll::Pending => true,
877            });
878
879        Poll::Pending
880    }
881}