libp2p_autorelay/
lib.rs

1pub mod utils;
2
3use core::task::{Context, Poll};
4use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
5use futures::StreamExt;
6use libp2p::core::transport::ListenerId;
7use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
8use libp2p::multiaddr::Protocol;
9use libp2p::relay::v2::client::Event as RelayClientEvent;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::{
12    self, dummy::ConnectionHandler as DummyConnectionHandler, DialError, NetworkBehaviour,
13    PollParameters,
14};
15use log::{info, trace, warn};
16use rand::seq::SliceRandom;
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::net::IpAddr;
20use std::time::Duration;
21use wasm_timer::{Instant, Interval};
22
23#[derive(Debug, Clone)]
24pub enum Event {
25    ReservationSelected {
26        peer_id: PeerId,
27        addrs: Vec<Multiaddr>,
28    },
29    ReservationRemoved {
30        peer_id: PeerId,
31        listener: ListenerId,
32    },
33    Added {
34        peer_id: PeerId,
35        addr: Vec<Multiaddr>,
36    },
37    FindCandidate(UnboundedSender<PeerId>),
38    CandidateLimitReached {
39        current: usize,
40        limit: usize,
41    },
42    ReservationLimitReached {
43        current: usize,
44        limit: usize,
45    },
46}
47
48type NetworkBehaviourAction = swarm::NetworkBehaviourAction<Event, DummyConnectionHandler>;
49
50#[derive(Debug, Copy, Clone)]
51pub struct RelayLimits {
52    pub min_candidates: usize,
53    pub max_candidates: usize,
54    pub min_reservation: usize,
55    pub max_reservation: usize,
56}
57
58impl Default for RelayLimits {
59    fn default() -> Self {
60        Self {
61            min_candidates: 1,
62            max_candidates: 20,
63            min_reservation: 1,
64            max_reservation: 2,
65        }
66    }
67}
68
69#[derive(Debug, Clone, Copy, Default)]
70//note: Should only really be used internally to determine nat status
71//      if autonat is used, otherwise this can be ignored
72//TODO: Determine if this is something we should listen on?
73pub enum Nat {
74    Public,
75    Private,
76    #[default]
77    Unknown,
78}
79
80#[allow(dead_code)]
81//Note: `candidates_without_addr` is not in use but is meant to be used for fetching from
82//      kad providers (or just sending peers through channels that will be used as a relay)
83pub struct AutoRelay {
84    events: VecDeque<NetworkBehaviourAction>,
85
86    pending_candidates: HashMap<PeerId, Vec<Multiaddr>>,
87
88    candidates_without_addr: HashSet<PeerId>,
89
90    candidates: HashMap<PeerId, Vec<Multiaddr>>,
91
92    candidates_rtt: HashMap<PeerId, [Duration; 3]>,
93
94    candidates_connection: HashMap<ConnectionId, Multiaddr>,
95
96    reservation: HashMap<ListenerId, Multiaddr>,
97
98    reservation_peer: HashSet<PeerId>,
99
100    pending_reservation_peer: HashSet<PeerId>,
101
102    channel: Option<UnboundedReceiver<PeerId>>,
103
104    // Will have a delay start, but will be used to find candidates that might be used
105    interval: Interval,
106
107    // Note: In case we should ignore any relays, such as some who have had bad connection,
108    //       ping, not reliable in some, or might want to temporarily ignore
109    // If the value is `None` the peer will remain blacklisted
110    // TODO: add logic to handle duration, if any
111    blacklist: HashMap<PeerId, Option<Duration>>,
112
113    // Used to check for the nat status. If we are not behind a NAT, then a relay probably should not be used
114    // since a direct connection could be established
115    // TODO: Investigate if the status changes when port mapping is done
116    nat_status: Nat,
117
118    limits: RelayLimits,
119}
120
121impl Default for AutoRelay {
122    fn default() -> Self {
123        Self {
124            events: Default::default(),
125            pending_candidates: Default::default(),
126            candidates_without_addr: Default::default(),
127            candidates: Default::default(),
128            candidates_rtt: Default::default(),
129            candidates_connection: Default::default(),
130            channel: None,
131            reservation: Default::default(),
132            reservation_peer: Default::default(),
133            pending_reservation_peer: Default::default(),
134            blacklist: Default::default(),
135            interval: Interval::new_at(
136                Instant::now() + Duration::from_secs(10),
137                Duration::from_secs(5),
138            ),
139            nat_status: Nat::Unknown,
140            limits: Default::default(),
141        }
142    }
143}
144
145impl AutoRelay {
146    pub fn limits(&self) -> RelayLimits {
147        self.limits
148    }
149
150    pub fn candidates_amount(&self) -> usize {
151        self.candidates.len()
152    }
153
154    pub fn reservation_amount(&self) -> usize {
155        self.reservation_peer.len()
156    }
157
158    // Used to manually add a relay candidate
159    pub fn add_static_relay(&mut self, peer_id: PeerId, addr: Multiaddr) -> anyhow::Result<()> {
160        //TODO: Maybe strip invalid protocols from address?
161        if addr
162            .iter()
163            .any(|proto| matches!(proto, Protocol::P2pCircuit | Protocol::P2p(_)))
164        {
165            anyhow::bail!("address contained an invalid protocol");
166        }
167
168        info!("Attempting to add {peer_id} as a static relay");
169        //TODO: If address contains a dns, maybe we should resolve it?
170
171        if let Entry::Occupied(entry) = self.pending_candidates.entry(peer_id) {
172            if entry.get().contains(&addr) {
173                anyhow::bail!("Address is already pending");
174            }
175        }
176
177        if let Entry::Occupied(entry) = self.candidates.entry(peer_id) {
178            if entry.get().contains(&addr) {
179                anyhow::bail!("Address is already added");
180            }
181        }
182
183        trace!("Connecting to {:?}", addr);
184
185        let new_addr = addr.clone().with(Protocol::P2p(peer_id.into()));
186
187        let handler = self.new_handler();
188
189        //Thought: Should we set with a new peer instead and have the condition set to always in the event we are connected but the peer somehow was not
190        //         apart of the list here?
191        self.events.push_back(NetworkBehaviourAction::Dial {
192            opts: DialOpts::unknown_peer_id().address(new_addr).build(),
193            handler,
194        });
195
196        self.pending_candidates
197            .entry(peer_id)
198            .or_default()
199            .push(addr);
200
201        Ok(())
202    }
203
204    pub fn list_candidates(&self) -> impl Iterator<Item = &PeerId> {
205        self.candidates.keys()
206    }
207
208    pub fn list_candidates_addr(&self) -> impl Iterator<Item = Vec<Multiaddr>> + '_ {
209        self.candidates.iter().map(|(peer, addrs)| {
210            addrs
211                .iter()
212                .cloned()
213                .map(|addr| addr.with(Protocol::P2p((*peer).into())))
214                .collect::<Vec<_>>()
215        })
216    }
217
218    pub fn list_reservation_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
219        self.reservation_peer.iter()
220    }
221
222    pub fn in_candidate_threshold(&self) -> bool {
223        self.candidates.len() >= self.limits.min_candidates
224            && self.candidates.len() <= self.limits.max_candidates
225    }
226
227    pub fn out_of_candidate_threshold(&self) -> bool {
228        self.candidates.len() < self.limits.min_candidates
229            || self.candidates.len() > self.limits.max_candidates
230    }
231
232    pub fn in_reservation_threshold(&self) -> bool {
233        self.reservation_peer.len() >= self.limits.min_reservation
234            && self.reservation_peer.len() <= self.limits.max_reservation
235    }
236
237    pub fn out_of_reservation_threshold(&self) -> bool {
238        self.reservation_peer.len() < self.limits.min_reservation
239            || self.reservation_peer.len() > self.limits.max_reservation
240    }
241
242    pub fn avg_rtt(&self, peer_id: PeerId) -> Option<u128> {
243        let rtts = self.candidates_rtt.get(&peer_id).copied()?;
244        let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
245        // used in case we cant produce a full avg
246        let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
247        let avg = avg / div;
248        Some(avg)
249    }
250
251    #[allow(dead_code)]
252    //TODO: Maybe ignore for now?
253    pub(crate) fn change_nat(&mut self, nat: Nat) {
254        self.nat_status = nat;
255        //TODO: If nat change to public to probably disconnect relay
256        //      but if it change to private to attempt to utilize a relay
257    }
258
259    pub fn select_candidate(&mut self, peer_id: PeerId) {
260        // We remove to prevent duplications
261        if let Some(addrs) = self.candidates.get(&peer_id).cloned() {
262            if self.pending_reservation_peer.insert(peer_id) {
263                self.events.push_back(NetworkBehaviourAction::GenerateEvent(
264                    Event::ReservationSelected { peer_id, addrs },
265                ));
266            }
267        }
268    }
269
270    pub fn find_candidates(&mut self, blacklist: bool) {
271        if blacklist {
272            for peer_id in self.candidates.keys() {
273                self.blacklist.insert(*peer_id, None);
274            }
275        }
276
277        self.candidates.clear();
278        self.candidates_rtt.clear();
279
280        let (tx, rx) = unbounded();
281
282        self.channel = Some(rx);
283
284        self.interval = Interval::new_at(
285            Instant::now() + Duration::from_secs(1),
286            Duration::from_secs(5),
287        );
288
289        self.events
290            .push_back(NetworkBehaviourAction::GenerateEvent(Event::FindCandidate(
291                tx,
292            )));
293    }
294
295    // This will select a candidate with the lowest ping
296    //NOTE: Might have a function that would randomize the selection
297    //      rather than relying on low rtt but it might be better this
298    //      way
299    pub fn select_candidate_low_rtt(&mut self) {
300        if self.candidates.len() < self.limits.min_candidates {
301            warn!("Candidates are below threshold");
302            return;
303        }
304
305        if self.reservation_peer.len() >= self.limits.max_reservation {
306            warn!("Reservation is at its threshold. Will not continue with select");
307            return;
308        }
309
310        let mut best_candidate = None;
311        let mut last_rtt: Option<Duration> = None;
312
313        for peer_id in self.candidates.keys() {
314            if self.reservation_peer.contains(peer_id)
315                || self.blacklist.contains_key(peer_id)
316                || self.pending_reservation_peer.contains(peer_id)
317            {
318                continue;
319            }
320            let Some(avg_rtt) = self.avg_rtt(*peer_id) else {
321                continue;
322            };
323
324            if let Some(current) = last_rtt.as_mut() {
325                if avg_rtt < current.as_millis() {
326                    *current = Duration::from_millis(avg_rtt as _);
327                    best_candidate = Some(*peer_id);
328                }
329            } else {
330                last_rtt = Some(Duration::from_millis(avg_rtt as _));
331                best_candidate = Some(*peer_id);
332            }
333        }
334
335        //Note/TODO: If rtt is high for the best candidate it then it might be best to eject all
336        //      candidates and fill up the map with new ones?
337
338        let Some(peer_id) = best_candidate else {
339            warn!("No candidate was found");
340            return;
341        };
342
343        if self.pending_reservation_peer.contains(&peer_id) {
344            return;
345        }
346
347        if self.reservation_peer.get(&peer_id).is_some() {
348            return;
349        }
350
351        self.select_candidate(peer_id);
352    }
353
354    pub fn select_candidate_random(&mut self) {
355        if self.candidates.len() < self.limits.min_candidates {
356            warn!("Candidates are below threshold");
357            return;
358        }
359
360        if self.reservation_peer.len() >= self.limits.max_reservation {
361            warn!("Reservation is at its threshold. Will not continue with selection");
362            return;
363        }
364
365        let mut rng = rand::thread_rng();
366
367        let list = self.candidates.keys().copied().collect::<Vec<_>>();
368
369        let Some(candidate) = list
370            .choose(&mut rng) else {
371                return;
372            };
373
374        if self.reservation_peer.get(candidate).is_some() {
375            return;
376        }
377
378        self.select_candidate(*candidate);
379    }
380
381    pub fn set_candidate_rtt(&mut self, peer_id: PeerId, rtt: Duration) {
382        if self.candidates.contains_key(&peer_id) {
383            self.candidates_rtt
384                .entry(peer_id)
385                .and_modify(|r| {
386                    r.rotate_left(1);
387                    r[2] = rtt;
388                })
389                .or_insert([Duration::from_millis(0), Duration::from_millis(0), rtt]);
390        }
391    }
392
393    pub fn inject_candidate(&mut self, peer_id: PeerId, addrs: Vec<Multiaddr>) {
394        let candidates_size = self.candidates.len();
395
396        if candidates_size >= self.limits.max_candidates || self.blacklist.contains_key(&peer_id) {
397            return;
398        }
399
400        let mut filtered_addrs = vec![];
401
402        for addr in addrs {
403            if let Some(protocol) = addr.iter().next() {
404                // Not sure of any use case where a loopback is used as a relay so this will get filtered
405                // but do we want to also check the private ip? For now it will be done but maybe
406                // allow a configuration to accept it for internal use?
407
408                //TODO: Cleanup logic for checking for unroutable addresses
409                let ip = match protocol {
410                    // Checking for private ip here since IpAddr doesnt allow us to do that
411                    Protocol::Ip4(ip) if !ip.is_private() => IpAddr::V4(ip),
412                    Protocol::Ip6(ip) => IpAddr::V6(ip),
413                    _ => continue,
414                };
415                //TODO: Use IpAddr::is_global once stable
416                if ip.is_loopback() {
417                    continue;
418                }
419            }
420            filtered_addrs.push(addr);
421        }
422
423        *self.candidates.entry(peer_id).or_default() = filtered_addrs.clone();
424        self.events
425            .push_back(NetworkBehaviourAction::GenerateEvent(Event::Added {
426                peer_id,
427                addr: filtered_addrs,
428            }));
429    }
430
431    //Note: Maybe import the relay behaviour here so we can poll the events ourselves rather than injecting it into this behaviour
432    pub fn inject_relay_client_event(&mut self, event: RelayClientEvent) {
433        match event {
434            RelayClientEvent::ReservationReqAccepted { relay_peer_id, .. } => {
435                info!("Reservation accepted with {relay_peer_id}");
436            }
437            RelayClientEvent::ReservationReqFailed {
438                relay_peer_id,
439                error,
440                ..
441            } => {
442                self.reservation_peer.remove(&relay_peer_id);
443                self.candidates.remove(&relay_peer_id);
444                self.blacklist.insert(relay_peer_id, None);
445                log::error!("Reservation request failed {relay_peer_id}: {error}");
446            }
447            e => info!("Relay Client Event: {e:?}"),
448        }
449    }
450}
451
452impl NetworkBehaviour for AutoRelay {
453    type ConnectionHandler = DummyConnectionHandler;
454    type OutEvent = Event;
455
456    fn new_handler(&mut self) -> Self::ConnectionHandler {
457        DummyConnectionHandler
458    }
459
460    fn inject_connection_established(
461        &mut self,
462        peer_id: &PeerId,
463        connection_id: &ConnectionId,
464        endpoint: &ConnectedPoint,
465        _failed_addresses: Option<&Vec<Multiaddr>>,
466        _other_established: usize,
467    ) {
468        //Note: Because we are not able to obtain the protocols of the connected peer
469        //      here, we will not be able to every peer injected into this event as
470        //      a candidate. Instead, we will rely on listening on the swarm
471        //      and injecting the peer information here if they support v2 relay STOP protocol
472        if let Entry::Occupied(mut entry) = self.pending_candidates.entry(*peer_id) {
473            if let ConnectedPoint::Dialer { address, .. } = endpoint {
474                let addresses = entry.get_mut();
475
476                let (_, address_without_peer) = extract_peer_id_from_multiaddr(address.clone());
477                if !addresses.contains(&address_without_peer) {
478                    return;
479                }
480
481                if let Some(index) = addresses.iter().position(|x| *x == address_without_peer) {
482                    addresses.swap_remove(index);
483                    if addresses.is_empty() {
484                        entry.remove();
485                    }
486                }
487
488                self.candidates_connection
489                    .insert(*connection_id, address.clone());
490
491                self.candidates
492                    .entry(*peer_id)
493                    .or_default()
494                    .push(address_without_peer.clone());
495
496                self.events
497                    .push_back(NetworkBehaviourAction::GenerateEvent(Event::Added {
498                        peer_id: *peer_id,
499                        addr: vec![address_without_peer],
500                    }))
501            }
502        }
503    }
504
505    fn inject_connection_closed(
506        &mut self,
507        peer_id: &PeerId,
508        id: &ConnectionId,
509        _endpoint: &ConnectedPoint,
510        _handler: Self::ConnectionHandler,
511        _remaining_established: usize,
512    ) {
513        if let Entry::Occupied(mut entry) = self.candidates.entry(*peer_id) {
514            let addresses = entry.get_mut();
515
516            if let Some(address) = self.candidates_connection.remove(id) {
517                if let Some(pos) = addresses.iter().position(|a| *a == address) {
518                    addresses.swap_remove(pos);
519                }
520
521                //TODO: Check to determine if we have a reservation and if so
522                //      to send an event and begin the process of finding another candidates
523                if addresses.is_empty() {
524                    entry.remove();
525                }
526            }
527        }
528    }
529
530    fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
531
532    fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
533
534        if self.reservation.contains_key(&id) {
535            return;
536        }
537            
538        if !addr
539            .iter()
540            .any(|proto| matches!(proto, Protocol::P2pCircuit | Protocol::P2p(_)))
541        {
542            // We want to make sure that we only collect addresses that contained p2p and p2p-circuit protocols
543            return;
544        }
545
546        let mut addr = addr.clone();
547
548        //not sure if we want to store the p2p protocol but for now strip it out
549        let Some(Protocol::P2p(_)) = addr.pop() else {
550            return;
551        };
552
553        let Some(Protocol::P2pCircuit) = addr.pop() else {
554            return;
555        };
556
557        let Some(peer_id) = peer_id_from_multiaddr(addr.clone()) else {
558            return;
559        };
560
561        self.pending_reservation_peer.remove(&peer_id);
562        self.reservation.insert(id, addr);
563        self.reservation_peer.insert(peer_id);
564    }
565
566    fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
567        //TODO
568    }
569
570    fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {
571        //TODO
572    }
573
574    fn inject_listener_error(&mut self, _id: ListenerId, _: &(dyn std::error::Error + 'static)) {}
575
576    fn inject_dial_failure(
577        &mut self,
578        peer_id: Option<PeerId>,
579        _handler: Self::ConnectionHandler,
580        error: &DialError,
581    ) {
582        if let Some(peer_id) = peer_id {
583            if let Entry::Occupied(mut entry) = self.pending_candidates.entry(peer_id) {
584                let addresses = entry.get_mut();
585
586                match error {
587                    DialError::Transport(multiaddrs) => {
588                        for (addr, _) in multiaddrs {
589                            let (peer, maddr) = extract_peer_id_from_multiaddr(addr.clone());
590                            if let Some(peer) = peer {
591                                if peer != peer_id {
592                                    //Note: Unlikely to happen but a precaution
593                                    //TODO: Maybe panic here if there is ever a mismatch to note as a bug
594                                    warn!("PeerId mismatch. {peer} != {peer_id}");
595                                }
596                            }
597
598                            if let Some(pos) = addresses.iter().position(|a| *a == maddr) {
599                                addresses.swap_remove(pos);
600                            }
601                        }
602                    }
603                    _e => {}
604                }
605
606                if addresses.is_empty() {
607                    entry.remove();
608                }
609            }
610        }
611    }
612
613    fn poll(
614        &mut self,
615        cx: &mut Context,
616        _: &mut impl PollParameters,
617    ) -> Poll<swarm::NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
618        if let Some(event) = self.events.pop_front() {
619            return Poll::Ready(event);
620        }
621
622        while let Poll::Ready(Some(_)) = self.interval.poll_next_unpin(cx) {
623            self.select_candidate_low_rtt();
624        }
625
626        Poll::Pending
627    }
628}
629
630pub(crate) fn peer_id_from_multiaddr(addr: Multiaddr) -> Option<PeerId> {
631    let (peer, _) = extract_peer_id_from_multiaddr(addr);
632    peer
633}
634
635#[allow(dead_code)]
636pub(crate) fn extract_peer_id_from_multiaddr(mut addr: Multiaddr) -> (Option<PeerId>, Multiaddr) {
637    match addr.pop() {
638        Some(Protocol::P2p(hash)) => match PeerId::from_multihash(hash) {
639            Ok(id) => (Some(id), addr),
640            _ => (None, addr),
641        },
642        _ => (None, addr),
643    }
644}