Skip to main content

libp2p_relay_manager/
lib.rs

1mod handler;
2
3use std::{
4    collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
5    error::Error,
6    hash::Hash,
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::StreamExt;
12use futures_timer::Delay;
13use libp2p::core::transport::PortUse;
14use libp2p::{
15    core::Endpoint,
16    multiaddr::Protocol,
17    swarm::{
18        derive_prelude::{ConnectionEstablished, ListenerId},
19        dial_opts::DialOpts,
20        AddressChange, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure,
21        ExpiredListenAddr, FromSwarm, ListenOpts, ListenerClosed, ListenerError, NetworkBehaviour,
22        NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23    },
24    Multiaddr, PeerId,
25};
26use rand::prelude::IndexedRandom;
27
28#[derive(Debug)]
29pub enum Event {
30    ReservationSuccessful {
31        peer_id: PeerId,
32        initial_addr: Multiaddr,
33    },
34    ReservationClosed {
35        peer_id: PeerId,
36        result: Result<(), Box<dyn Error + Send>>,
37    },
38    ReservationFailure {
39        peer_id: PeerId,
40        result: Box<dyn Error + Send>,
41    },
42    FindRelays {
43        /// Namespace of where to locate possible relay candidates
44        namespace: Option<String>,
45        /// Channel to send peer ids of the candidates
46        channel: futures::channel::mpsc::Sender<HashSet<PeerId>>,
47    },
48}
49
50#[derive(Debug, Clone)]
51struct Connection {
52    pub peer_id: PeerId,
53    pub id: ConnectionId,
54    pub addr: Multiaddr,
55    pub candidacy: Candidate,
56    pub rtt: Option<[Duration; 3]>,
57}
58
59#[derive(Debug, Clone)]
60enum Candidate {
61    Pending,
62    Unsupported,
63    Confirmed {
64        listener_id: Option<ListenerId>,
65        addresses: Vec<Multiaddr>,
66    },
67}
68
69impl PartialEq for Connection {
70    fn eq(&self, other: &Self) -> bool {
71        self.id.eq(&other.id)
72    }
73}
74
75impl Eq for Connection {}
76
77#[allow(dead_code)]
78#[derive(Clone, Debug, PartialEq, Eq)]
79struct PendingReservation {
80    peer_id: PeerId,
81    connection_id: ConnectionId,
82    listener_id: ListenerId,
83}
84
85impl Hash for PendingReservation {
86    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
87        self.listener_id.hash(state)
88    }
89}
90
91#[derive(Debug)]
92#[allow(dead_code)]
93enum ReconnectState {
94    Idle {
95        backoff: bool,
96        delay: Delay,
97    },
98    Pending {
99        connection_id: ConnectionId,
100        backoff: bool,
101    },
102}
103
104#[derive(Default, Debug)]
105#[allow(dead_code)]
106pub struct Behaviour {
107    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
108    relays: HashMap<PeerId, Vec<Multiaddr>>,
109    connections: HashMap<PeerId, Vec<Connection>>,
110
111    reconnect: HashMap<PeerId, ReconnectState>,
112
113    discovery_channel: HashMap<u64, futures::channel::mpsc::Receiver<HashSet<PeerId>>>,
114
115    pending_connection: HashSet<ConnectionId>,
116    pending_selection: HashSet<PeerId>,
117    config: Config,
118}
119
120#[derive(Debug, Default)]
121pub struct Config {
122    /// Automatically add confirmed connections to the relay list
123    pub auto_relay: bool,
124
125    /// Automatically connect to peers that are added
126    pub auto_connect: bool,
127
128    /// Min data limit for relay reservation. Anything under this value would reject the relay reservation
129    pub limit: Option<u64>,
130
131    /// Retry relay connection
132    pub backoff: Duration,
133}
134
135impl Behaviour {
136    pub fn new(config: Config) -> Behaviour {
137        Self {
138            config,
139            events: VecDeque::default(),
140            relays: HashMap::default(),
141            connections: HashMap::default(),
142            reconnect: HashMap::default(),
143            discovery_channel: HashMap::default(),
144            pending_connection: HashSet::default(),
145            pending_selection: HashSet::default(),
146        }
147    }
148
149    pub fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
150        match self.relays.entry(peer_id) {
151            Entry::Vacant(entry) => {
152                entry.insert(vec![addr.clone()]);
153            }
154            Entry::Occupied(mut entry) => {
155                let list = entry.get_mut();
156                if list.contains(&addr) {
157                    return;
158                }
159                list.push(addr.clone());
160            }
161        }
162        if self.config.auto_connect {
163            if let Entry::Occupied(entry) = self.connections.entry(peer_id) {
164                if entry.get().iter().any(|connection| connection.addr == addr) {
165                    return;
166                }
167            }
168
169            let opts = DialOpts::peer_id(peer_id).build();
170            self.events.push_back(ToSwarm::Dial { opts })
171        }
172    }
173
174    pub fn remove_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
175        if let Entry::Occupied(mut entry) = self.relays.entry(peer_id) {
176            let list = entry.get_mut();
177
178            if let Some(connection) = self.connections.get(&peer_id).and_then(|connections| {
179                connections
180                    .iter()
181                    .find(|connection| connection.addr.eq(&addr))
182            }) {
183                if let Candidate::Confirmed {
184                    listener_id: Some(id),
185                    ..
186                } = connection.candidacy
187                {
188                    self.events.push_back(ToSwarm::RemoveListener { id });
189                }
190            }
191
192            list.retain(|inner_addr| addr.ne(inner_addr));
193            if list.is_empty() {
194                entry.remove();
195            }
196        }
197    }
198
199    pub fn list_relays(&self) -> impl Iterator<Item = (&PeerId, &Vec<Multiaddr>)> {
200        self.relays.iter()
201    }
202
203    pub fn list_active_relays(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
204        self.connections
205            .iter()
206            .filter(|(_, connections)| {
207                connections.iter().any(|connection| {
208                    matches!(
209                        connection.candidacy,
210                        Candidate::Confirmed {
211                            listener_id: Some(_),
212                            ..
213                        }
214                    )
215                })
216            })
217            .map(|(peer_id, connections)| {
218                (
219                    *peer_id,
220                    connections
221                        .iter()
222                        .map(|connection| &connection.addr)
223                        .cloned()
224                        .collect::<Vec<_>>(),
225                )
226            })
227            .collect()
228    }
229
230    #[allow(dead_code)]
231    fn avg_rtt(&self, connection: &Connection) -> u128 {
232        let rtts = connection.rtt.unwrap_or_default();
233        let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
234        // used in case we cant produce a full avg
235        let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
236        avg / div
237    }
238
239    pub fn select(&mut self, peer_id: PeerId) {
240        if !self.relays.contains_key(&peer_id) {
241            self.events
242                .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
243                    peer_id,
244                    result: Box::new(std::io::Error::other("Peer is not added in relay list")),
245                }));
246            return;
247        }
248
249        if self.pending_selection.contains(&peer_id) {
250            return;
251        }
252
253        if !self.connections.contains_key(&peer_id) {
254            let opts = DialOpts::peer_id(peer_id).build();
255            let id = opts.connection_id();
256            self.pending_connection.insert(id);
257            self.events.push_back(ToSwarm::Dial { opts });
258            self.pending_selection.insert(peer_id);
259            return;
260        }
261
262        let connections = match self.connections.get_mut(&peer_id) {
263            Some(conns) => conns,
264            None => return,
265        };
266
267        if connections.is_empty() {
268            return;
269        }
270
271        let mut temp_connections = connections.clone();
272        let mut rng = rand::rng();
273        let connection = loop {
274            if temp_connections.is_empty() {
275                self.events
276                    .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
277                        peer_id,
278                        result: Box::new(std::io::Error::other(
279                            "no qualified connections available",
280                        )),
281                    }));
282                return;
283            }
284
285            let connection = temp_connections
286                .choose(&mut rng)
287                .cloned()
288                .expect("Connection available");
289
290            if let Candidate::Confirmed {
291                listener_id: Some(_),
292                ..
293            } = connection.candidacy
294            {
295                // If the candidate is confirmed, remove it from the temporary connection
296                // and try another random connection, if any is available
297                temp_connections.retain(|c| c.id != connection.id);
298                continue;
299            }
300
301            break connections
302                .iter_mut()
303                .find(|c| c.id == connection.id)
304                .expect("Connection available");
305        };
306
307        if matches!(connection.candidacy, Candidate::Pending) {
308            self.pending_selection.insert(peer_id);
309            return;
310        }
311
312        let relay_addr = connection.addr.clone().with(Protocol::P2pCircuit);
313
314        let opts = ListenOpts::new(relay_addr);
315
316        let id = opts.listener_id();
317
318        if let Candidate::Confirmed { listener_id, .. } = &mut connection.candidacy {
319            *listener_id = Some(id);
320        }
321
322        self.events.push_back(ToSwarm::ListenOn { opts });
323    }
324
325    pub fn random_select(&mut self) -> Option<PeerId> {
326        let relay_peers = self.relays.keys().copied().collect::<Vec<_>>();
327        if relay_peers.is_empty() {
328            return None;
329        }
330
331        let mut rng = rand::rng();
332
333        let peer_id = relay_peers.choose(&mut rng)?;
334
335        self.select(*peer_id);
336
337        Some(*peer_id)
338    }
339
340    pub fn disable_relay(&mut self, peer_id: PeerId) {
341        for connection in self
342            .connections
343            .iter()
344            .filter(|(peer, _)| peer_id == **peer)
345            .flat_map(|(_, connections)| connections)
346        {
347            if let Candidate::Confirmed { .. } = connection.candidacy {
348                //TODO: Use ListenerId instead of closing a connection
349                let connection = libp2p::swarm::CloseConnection::One(connection.id);
350                self.events.push_back(ToSwarm::CloseConnection {
351                    peer_id,
352                    connection,
353                });
354            }
355        }
356    }
357
358    pub fn set_peer_rtt(&mut self, peer_id: PeerId, connection_id: ConnectionId, rtt: Duration) {
359        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
360            let connections = entry.get_mut();
361            if let Some(connection) = connections
362                .iter_mut()
363                .find(|connection| connection.id == connection_id)
364            {
365                match connection.rtt.as_mut() {
366                    Some(connection_rtt) => {
367                        connection_rtt.rotate_left(1);
368                        connection_rtt[2] = rtt;
369                    }
370                    None => connection.rtt = Some([Duration::ZERO, Duration::ZERO, rtt]),
371                }
372            }
373        }
374    }
375
376    fn on_listen_on(
377        &mut self,
378        NewListenAddr {
379            listener_id,
380            addr: direct_addr,
381        }: NewListenAddr,
382    ) {
383        if !direct_addr
384            .iter()
385            .any(|proto| matches!(proto, Protocol::P2pCircuit))
386        {
387            return;
388        }
389
390        for connection in self
391            .connections
392            .values_mut()
393            .flatten()
394            .filter(|connection| {
395                if let Candidate::Confirmed {
396                    listener_id: Some(id),
397                    ..
398                } = connection.candidacy
399                {
400                    id == listener_id
401                } else {
402                    false
403                }
404            })
405        {
406            match &mut connection.candidacy {
407                Candidate::Confirmed {
408                    listener_id: id,
409                    addresses,
410                } => {
411                    *id = Some(listener_id);
412                    let first = addresses.is_empty();
413                    if !addresses.contains(direct_addr) {
414                        addresses.push(direct_addr.clone());
415                        if first {
416                            self.events.push_back(ToSwarm::GenerateEvent(
417                                Event::ReservationSuccessful {
418                                    peer_id: connection.peer_id,
419                                    initial_addr: direct_addr.clone(),
420                                },
421                            ))
422                        }
423                    }
424                }
425                Candidate::Pending | Candidate::Unsupported => {
426                    // Maybe panic if we reach this clause?
427                }
428            };
429        }
430    }
431
432    fn on_listener_close(
433        &mut self,
434        ListenerClosed {
435            listener_id,
436            reason,
437        }: ListenerClosed,
438    ) {
439        let Some(connection) =
440            self.connections
441                .values_mut()
442                .flatten()
443                .find(|connection| match connection.candidacy {
444                    Candidate::Confirmed {
445                        listener_id: Some(id),
446                        ..
447                    } => id == listener_id,
448                    _ => false,
449                })
450        else {
451            return;
452        };
453
454        if let Candidate::Confirmed {
455            listener_id,
456            addresses,
457        } = &mut connection.candidacy
458        {
459            listener_id.take();
460            let addrs = std::mem::take(addresses);
461            let has_addresses = addrs.is_empty();
462
463            for addr in addrs {
464                self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
465            }
466
467            match (has_addresses, reason) {
468                (true, result) => {
469                    self.events
470                        .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
471                            peer_id: connection.peer_id,
472                            result: result
473                                .map_err(|e| std::io::Error::new(e.kind(), e.to_string()))
474                                .map_err(|e| Box::new(e) as Box<_>),
475                        }))
476                }
477                (false, Err(e)) => {
478                    self.events
479                        .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
480                            peer_id: connection.peer_id,
481                            result: Box::new(std::io::Error::new(e.kind(), e.to_string())),
482                        }))
483                }
484                _ => {}
485            }
486        }
487    }
488
489    fn on_listener_error(&mut self, ListenerError { listener_id, err }: ListenerError) {
490        let Some(connection) =
491            self.connections
492                .values_mut()
493                .flatten()
494                .find(|connection| match connection.candidacy {
495                    Candidate::Confirmed {
496                        listener_id: Some(id),
497                        ..
498                    } => id == listener_id,
499                    _ => false,
500                })
501        else {
502            return;
503        };
504
505        if let Candidate::Confirmed {
506            listener_id,
507            addresses,
508        } = &mut connection.candidacy
509        {
510            listener_id.take();
511            let addrs = std::mem::take(addresses);
512            for addr in addrs {
513                self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
514            }
515            self.events
516                .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
517                    peer_id: connection.peer_id,
518                    result: Box::new(std::io::Error::other(err.to_string())),
519                }))
520        }
521    }
522
523    fn on_listener_expired(&mut self, ExpiredListenAddr { listener_id, addr }: ExpiredListenAddr) {
524        let Some(connection) =
525            self.connections
526                .values_mut()
527                .flatten()
528                .find(|connection| match connection.candidacy {
529                    Candidate::Confirmed {
530                        listener_id: Some(id),
531                        ..
532                    } => id == listener_id,
533                    _ => false,
534                })
535        else {
536            return;
537        };
538
539        if let Candidate::Confirmed { addresses, .. } = &mut connection.candidacy {
540            if !addresses.contains(addr) {
541                return;
542            }
543
544            addresses.retain(|a| a != addr);
545
546            self.events
547                .push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
548        }
549    }
550
551    fn on_address_change(
552        &mut self,
553        AddressChange {
554            peer_id,
555            connection_id,
556            old,
557            new,
558        }: AddressChange,
559    ) {
560        let Some(connections) = self.connections.get_mut(&peer_id) else {
561            return;
562        };
563
564        let Some(connection) = connections
565            .iter_mut()
566            .find(|connection| connection.id == connection_id)
567        else {
568            return;
569        };
570
571        let old_addr = match old {
572            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
573            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
574        };
575
576        let new_addr = match new {
577            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
578            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
579        };
580
581        if old_addr == new_addr {
582            return;
583        }
584
585        connection.addr = new_addr.clone();
586    }
587
588    fn on_dial_failure(
589        &mut self,
590        DialFailure {
591            peer_id,
592            error,
593            connection_id,
594        }: DialFailure,
595    ) {
596        if !self.pending_connection.remove(&connection_id) {
597            return;
598        }
599
600        let Some(peer_id) = peer_id else {
601            return;
602        };
603
604        self.events
605            .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
606                peer_id,
607                result: Box::new(std::io::Error::other(error.to_string())),
608            }));
609
610        //TODO: perform checks and do a reconnect attempt
611
612        // let Some(peer_id) = peer_id else {
613        //     return;
614        // };
615
616        // match error {
617        //     libp2p::swarm::DialError::LocalPeerId { .. } => {
618        //         self.relays.remove(&peer_id);
619        //     }
620        //     libp2p::swarm::DialError::NoAddresses => {
621        //         self.relays.remove(&peer_id);
622        //     }
623        //     libp2p::swarm::DialError::DialPeerConditionFalse(_) => {}
624        //     libp2p::swarm::DialError::Aborted => {}
625        //     libp2p::swarm::DialError::WrongPeerId { obtained, endpoint } => {}
626        //     libp2p::swarm::DialError::Denied { cause } => {}
627        //     libp2p::swarm::DialError::Transport(_) => {}
628        // }
629    }
630
631    fn on_connection_established(
632        &mut self,
633        ConnectionEstablished {
634            peer_id,
635            connection_id,
636            endpoint,
637            ..
638        }: ConnectionEstablished,
639    ) {
640        self.pending_connection.remove(&connection_id);
641
642        let addr = match endpoint {
643            libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
644            libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
645        };
646
647        match self.relays.entry(peer_id) {
648            Entry::Occupied(entry) => {
649                let mut addr = addr.clone();
650                addr.pop();
651
652                if !entry.get().contains(&addr) {
653                    return;
654                }
655            }
656            Entry::Vacant(_) if self.config.auto_connect => {}
657            _ => return,
658        };
659
660        let connection = Connection {
661            peer_id,
662            id: connection_id,
663            addr: addr.clone(),
664            candidacy: Candidate::Pending,
665            rtt: None,
666        };
667
668        self.connections
669            .entry(peer_id)
670            .or_default()
671            .push(connection);
672
673        if self.pending_selection.remove(&peer_id) {
674            self.select(peer_id);
675        }
676    }
677
678    fn on_connection_closed(
679        &mut self,
680        ConnectionClosed {
681            peer_id,
682            connection_id,
683            ..
684        }: ConnectionClosed<'_>,
685    ) {
686        if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
687            let connections = entry.get_mut();
688            let Some(connection) = connections
689                .iter_mut()
690                .find(|connection| connection.id == connection_id)
691            else {
692                return;
693            };
694
695            //Note: If the listener has been closed, then this condition may not happen
696            //      but is set as a precaution
697            //TODO: Confirm that the order is consistent if the relay is removed
698            if let Candidate::Confirmed {
699                listener_id,
700                addresses,
701            } = &mut connection.candidacy
702            {
703                if let Some(listener_id) = listener_id.take() {
704                    let addrs = std::mem::take(addresses);
705                    for addr in addrs {
706                        self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
707                    }
708                    self.events
709                        .push_back(ToSwarm::RemoveListener { id: listener_id });
710
711                    self.events
712                        .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
713                            peer_id: connection.peer_id,
714                            result: Ok(()),
715                        }))
716                }
717            }
718
719            connections.retain(|connection| connection.id != connection_id);
720
721            if connections.is_empty() {
722                entry.remove();
723            }
724        }
725    }
726
727    pub fn process_relay_event(&mut self, _: libp2p::relay::client::Event) {
728        //TODO: Perform checks on limit reported from the reservation and either accept it or
729        //      disconnect and attempt a different connection to a relay with a higher
730        //      limit requirement
731        //NOTE: This is helpful if one knows that the relays will have a higher limit, otherwise
732        //      this may cause long waits when attempting to find relays with higher limits
733        //      for the reservation
734        // match event {
735        //     libp2p::relay::client::Event::ReservationReqAccepted { .. } => {}
736        //     _ => {}
737        // }
738    }
739}
740
741impl NetworkBehaviour for Behaviour {
742    type ToSwarm = Event;
743    type ConnectionHandler = handler::Handler;
744
745    fn handle_established_inbound_connection(
746        &mut self,
747        _connection_id: ConnectionId,
748        _peer: PeerId,
749        _local_addr: &Multiaddr,
750        _remote_addr: &Multiaddr,
751    ) -> Result<THandler<Self>, ConnectionDenied> {
752        Ok(handler::Handler::default())
753    }
754
755    fn handle_established_outbound_connection(
756        &mut self,
757        _connection_id: ConnectionId,
758        _peer: PeerId,
759        _addr: &Multiaddr,
760        _role_override: Endpoint,
761        _: PortUse,
762    ) -> Result<THandler<Self>, ConnectionDenied> {
763        Ok(handler::Handler::default())
764    }
765
766    fn handle_pending_outbound_connection(
767        &mut self,
768        _: ConnectionId,
769        maybe_peer: Option<PeerId>,
770        _: &[Multiaddr],
771        _: Endpoint,
772    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
773        let addrs = maybe_peer
774            .and_then(|peer_id| self.relays.get(&peer_id))
775            .cloned()
776            .unwrap_or_default();
777
778        Ok(addrs)
779    }
780
781    fn on_swarm_event(&mut self, event: FromSwarm) {
782        match event {
783            FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
784            FromSwarm::ConnectionClosed(event) => self.on_connection_closed(event),
785            FromSwarm::NewListenAddr(event) => self.on_listen_on(event),
786            FromSwarm::ListenerClosed(event) => self.on_listener_close(event),
787            FromSwarm::DialFailure(event) => self.on_dial_failure(event),
788            FromSwarm::ListenerError(event) => self.on_listener_error(event),
789            FromSwarm::ExpiredListenAddr(event) => self.on_listener_expired(event),
790            FromSwarm::AddressChange(event) => self.on_address_change(event),
791            _ => {}
792        }
793    }
794
795    fn on_connection_handler_event(
796        &mut self,
797        peer_id: PeerId,
798        connection_id: ConnectionId,
799        event: THandlerOutEvent<Self>,
800    ) {
801        match event {
802            handler::Out::Supported => {
803                if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
804                    let list = entry.get_mut();
805                    if let Some(connection) = list
806                        .iter_mut()
807                        .find(|connection| connection.id == connection_id)
808                    {
809                        let canadate_state = &mut connection.candidacy;
810
811                        if matches!(canadate_state, Candidate::Pending | Candidate::Unsupported) {
812                            *canadate_state = Candidate::Confirmed {
813                                listener_id: None,
814                                addresses: vec![],
815                            };
816                            if self.pending_selection.remove(&peer_id) {
817                                self.select(peer_id);
818                            }
819                        }
820                    }
821                }
822            }
823            handler::Out::Unsupported => {
824                if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
825                    let list = entry.get_mut();
826                    if let Some(connection) = list
827                        .iter_mut()
828                        .find(|connection| connection.id == connection_id)
829                    {
830                        let canadate_state = &mut connection.candidacy;
831
832                        if let Candidate::Confirmed {
833                            listener_id: Some(id),
834                            ..
835                        } = canadate_state
836                        {
837                            let id = *id;
838                            self.events.push_back(ToSwarm::RemoveListener { id });
839                        }
840
841                        *canadate_state = Candidate::Unsupported;
842                        self.pending_selection.remove(&peer_id);
843                    }
844                }
845            }
846        }
847    }
848
849    fn poll(
850        &mut self,
851        cx: &mut Context<'_>,
852    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
853        if let Some(event) = self.events.pop_front() {
854            return Poll::Ready(event);
855        }
856
857        self.discovery_channel
858            .retain(|_, rx| match rx.poll_next_unpin(cx) {
859                Poll::Ready(Some(list)) => {
860                    for peer_id in list {
861                        self.relays.entry(peer_id).or_default();
862                    }
863                    false
864                }
865                Poll::Ready(None) => false,
866                Poll::Pending => true,
867            });
868
869        Poll::Pending
870    }
871}