1pub mod utils;
2
3use core::task::{Context, Poll};
4use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
5use futures::StreamExt;
6use libp2p::core::transport::ListenerId;
7use libp2p::core::{connection::ConnectionId, ConnectedPoint, Multiaddr, PeerId};
8use libp2p::multiaddr::Protocol;
9use libp2p::relay::v2::client::Event as RelayClientEvent;
10use libp2p::swarm::dial_opts::DialOpts;
11use libp2p::swarm::{
12 self, dummy::ConnectionHandler as DummyConnectionHandler, DialError, NetworkBehaviour,
13 PollParameters,
14};
15use log::{info, trace, warn};
16use rand::seq::SliceRandom;
17use std::collections::hash_map::Entry;
18use std::collections::{HashMap, HashSet, VecDeque};
19use std::net::IpAddr;
20use std::time::Duration;
21use wasm_timer::{Instant, Interval};
22
23#[derive(Debug, Clone)]
24pub enum Event {
25 ReservationSelected {
26 peer_id: PeerId,
27 addrs: Vec<Multiaddr>,
28 },
29 ReservationRemoved {
30 peer_id: PeerId,
31 listener: ListenerId,
32 },
33 Added {
34 peer_id: PeerId,
35 addr: Vec<Multiaddr>,
36 },
37 FindCandidate(UnboundedSender<PeerId>),
38 CandidateLimitReached {
39 current: usize,
40 limit: usize,
41 },
42 ReservationLimitReached {
43 current: usize,
44 limit: usize,
45 },
46}
47
48type NetworkBehaviourAction = swarm::NetworkBehaviourAction<Event, DummyConnectionHandler>;
49
50#[derive(Debug, Copy, Clone)]
51pub struct RelayLimits {
52 pub min_candidates: usize,
53 pub max_candidates: usize,
54 pub min_reservation: usize,
55 pub max_reservation: usize,
56}
57
58impl Default for RelayLimits {
59 fn default() -> Self {
60 Self {
61 min_candidates: 1,
62 max_candidates: 20,
63 min_reservation: 1,
64 max_reservation: 2,
65 }
66 }
67}
68
69#[derive(Debug, Clone, Copy, Default)]
70pub enum Nat {
74 Public,
75 Private,
76 #[default]
77 Unknown,
78}
79
80#[allow(dead_code)]
81pub struct AutoRelay {
84 events: VecDeque<NetworkBehaviourAction>,
85
86 pending_candidates: HashMap<PeerId, Vec<Multiaddr>>,
87
88 candidates_without_addr: HashSet<PeerId>,
89
90 candidates: HashMap<PeerId, Vec<Multiaddr>>,
91
92 candidates_rtt: HashMap<PeerId, [Duration; 3]>,
93
94 candidates_connection: HashMap<ConnectionId, Multiaddr>,
95
96 reservation: HashMap<ListenerId, Multiaddr>,
97
98 reservation_peer: HashSet<PeerId>,
99
100 pending_reservation_peer: HashSet<PeerId>,
101
102 channel: Option<UnboundedReceiver<PeerId>>,
103
104 interval: Interval,
106
107 blacklist: HashMap<PeerId, Option<Duration>>,
112
113 nat_status: Nat,
117
118 limits: RelayLimits,
119}
120
121impl Default for AutoRelay {
122 fn default() -> Self {
123 Self {
124 events: Default::default(),
125 pending_candidates: Default::default(),
126 candidates_without_addr: Default::default(),
127 candidates: Default::default(),
128 candidates_rtt: Default::default(),
129 candidates_connection: Default::default(),
130 channel: None,
131 reservation: Default::default(),
132 reservation_peer: Default::default(),
133 pending_reservation_peer: Default::default(),
134 blacklist: Default::default(),
135 interval: Interval::new_at(
136 Instant::now() + Duration::from_secs(10),
137 Duration::from_secs(5),
138 ),
139 nat_status: Nat::Unknown,
140 limits: Default::default(),
141 }
142 }
143}
144
145impl AutoRelay {
146 pub fn limits(&self) -> RelayLimits {
147 self.limits
148 }
149
150 pub fn candidates_amount(&self) -> usize {
151 self.candidates.len()
152 }
153
154 pub fn reservation_amount(&self) -> usize {
155 self.reservation_peer.len()
156 }
157
158 pub fn add_static_relay(&mut self, peer_id: PeerId, addr: Multiaddr) -> anyhow::Result<()> {
160 if addr
162 .iter()
163 .any(|proto| matches!(proto, Protocol::P2pCircuit | Protocol::P2p(_)))
164 {
165 anyhow::bail!("address contained an invalid protocol");
166 }
167
168 info!("Attempting to add {peer_id} as a static relay");
169 if let Entry::Occupied(entry) = self.pending_candidates.entry(peer_id) {
172 if entry.get().contains(&addr) {
173 anyhow::bail!("Address is already pending");
174 }
175 }
176
177 if let Entry::Occupied(entry) = self.candidates.entry(peer_id) {
178 if entry.get().contains(&addr) {
179 anyhow::bail!("Address is already added");
180 }
181 }
182
183 trace!("Connecting to {:?}", addr);
184
185 let new_addr = addr.clone().with(Protocol::P2p(peer_id.into()));
186
187 let handler = self.new_handler();
188
189 self.events.push_back(NetworkBehaviourAction::Dial {
192 opts: DialOpts::unknown_peer_id().address(new_addr).build(),
193 handler,
194 });
195
196 self.pending_candidates
197 .entry(peer_id)
198 .or_default()
199 .push(addr);
200
201 Ok(())
202 }
203
204 pub fn list_candidates(&self) -> impl Iterator<Item = &PeerId> {
205 self.candidates.keys()
206 }
207
208 pub fn list_candidates_addr(&self) -> impl Iterator<Item = Vec<Multiaddr>> + '_ {
209 self.candidates.iter().map(|(peer, addrs)| {
210 addrs
211 .iter()
212 .cloned()
213 .map(|addr| addr.with(Protocol::P2p((*peer).into())))
214 .collect::<Vec<_>>()
215 })
216 }
217
218 pub fn list_reservation_peers(&self) -> impl Iterator<Item = &PeerId> + '_ {
219 self.reservation_peer.iter()
220 }
221
222 pub fn in_candidate_threshold(&self) -> bool {
223 self.candidates.len() >= self.limits.min_candidates
224 && self.candidates.len() <= self.limits.max_candidates
225 }
226
227 pub fn out_of_candidate_threshold(&self) -> bool {
228 self.candidates.len() < self.limits.min_candidates
229 || self.candidates.len() > self.limits.max_candidates
230 }
231
232 pub fn in_reservation_threshold(&self) -> bool {
233 self.reservation_peer.len() >= self.limits.min_reservation
234 && self.reservation_peer.len() <= self.limits.max_reservation
235 }
236
237 pub fn out_of_reservation_threshold(&self) -> bool {
238 self.reservation_peer.len() < self.limits.min_reservation
239 || self.reservation_peer.len() > self.limits.max_reservation
240 }
241
242 pub fn avg_rtt(&self, peer_id: PeerId) -> Option<u128> {
243 let rtts = self.candidates_rtt.get(&peer_id).copied()?;
244 let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
245 let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
247 let avg = avg / div;
248 Some(avg)
249 }
250
251 #[allow(dead_code)]
252 pub(crate) fn change_nat(&mut self, nat: Nat) {
254 self.nat_status = nat;
255 }
258
259 pub fn select_candidate(&mut self, peer_id: PeerId) {
260 if let Some(addrs) = self.candidates.get(&peer_id).cloned() {
262 if self.pending_reservation_peer.insert(peer_id) {
263 self.events.push_back(NetworkBehaviourAction::GenerateEvent(
264 Event::ReservationSelected { peer_id, addrs },
265 ));
266 }
267 }
268 }
269
270 pub fn find_candidates(&mut self, blacklist: bool) {
271 if blacklist {
272 for peer_id in self.candidates.keys() {
273 self.blacklist.insert(*peer_id, None);
274 }
275 }
276
277 self.candidates.clear();
278 self.candidates_rtt.clear();
279
280 let (tx, rx) = unbounded();
281
282 self.channel = Some(rx);
283
284 self.interval = Interval::new_at(
285 Instant::now() + Duration::from_secs(1),
286 Duration::from_secs(5),
287 );
288
289 self.events
290 .push_back(NetworkBehaviourAction::GenerateEvent(Event::FindCandidate(
291 tx,
292 )));
293 }
294
295 pub fn select_candidate_low_rtt(&mut self) {
300 if self.candidates.len() < self.limits.min_candidates {
301 warn!("Candidates are below threshold");
302 return;
303 }
304
305 if self.reservation_peer.len() >= self.limits.max_reservation {
306 warn!("Reservation is at its threshold. Will not continue with select");
307 return;
308 }
309
310 let mut best_candidate = None;
311 let mut last_rtt: Option<Duration> = None;
312
313 for peer_id in self.candidates.keys() {
314 if self.reservation_peer.contains(peer_id)
315 || self.blacklist.contains_key(peer_id)
316 || self.pending_reservation_peer.contains(peer_id)
317 {
318 continue;
319 }
320 let Some(avg_rtt) = self.avg_rtt(*peer_id) else {
321 continue;
322 };
323
324 if let Some(current) = last_rtt.as_mut() {
325 if avg_rtt < current.as_millis() {
326 *current = Duration::from_millis(avg_rtt as _);
327 best_candidate = Some(*peer_id);
328 }
329 } else {
330 last_rtt = Some(Duration::from_millis(avg_rtt as _));
331 best_candidate = Some(*peer_id);
332 }
333 }
334
335 let Some(peer_id) = best_candidate else {
339 warn!("No candidate was found");
340 return;
341 };
342
343 if self.pending_reservation_peer.contains(&peer_id) {
344 return;
345 }
346
347 if self.reservation_peer.get(&peer_id).is_some() {
348 return;
349 }
350
351 self.select_candidate(peer_id);
352 }
353
354 pub fn select_candidate_random(&mut self) {
355 if self.candidates.len() < self.limits.min_candidates {
356 warn!("Candidates are below threshold");
357 return;
358 }
359
360 if self.reservation_peer.len() >= self.limits.max_reservation {
361 warn!("Reservation is at its threshold. Will not continue with selection");
362 return;
363 }
364
365 let mut rng = rand::thread_rng();
366
367 let list = self.candidates.keys().copied().collect::<Vec<_>>();
368
369 let Some(candidate) = list
370 .choose(&mut rng) else {
371 return;
372 };
373
374 if self.reservation_peer.get(candidate).is_some() {
375 return;
376 }
377
378 self.select_candidate(*candidate);
379 }
380
381 pub fn set_candidate_rtt(&mut self, peer_id: PeerId, rtt: Duration) {
382 if self.candidates.contains_key(&peer_id) {
383 self.candidates_rtt
384 .entry(peer_id)
385 .and_modify(|r| {
386 r.rotate_left(1);
387 r[2] = rtt;
388 })
389 .or_insert([Duration::from_millis(0), Duration::from_millis(0), rtt]);
390 }
391 }
392
393 pub fn inject_candidate(&mut self, peer_id: PeerId, addrs: Vec<Multiaddr>) {
394 let candidates_size = self.candidates.len();
395
396 if candidates_size >= self.limits.max_candidates || self.blacklist.contains_key(&peer_id) {
397 return;
398 }
399
400 let mut filtered_addrs = vec![];
401
402 for addr in addrs {
403 if let Some(protocol) = addr.iter().next() {
404 let ip = match protocol {
410 Protocol::Ip4(ip) if !ip.is_private() => IpAddr::V4(ip),
412 Protocol::Ip6(ip) => IpAddr::V6(ip),
413 _ => continue,
414 };
415 if ip.is_loopback() {
417 continue;
418 }
419 }
420 filtered_addrs.push(addr);
421 }
422
423 *self.candidates.entry(peer_id).or_default() = filtered_addrs.clone();
424 self.events
425 .push_back(NetworkBehaviourAction::GenerateEvent(Event::Added {
426 peer_id,
427 addr: filtered_addrs,
428 }));
429 }
430
431 pub fn inject_relay_client_event(&mut self, event: RelayClientEvent) {
433 match event {
434 RelayClientEvent::ReservationReqAccepted { relay_peer_id, .. } => {
435 info!("Reservation accepted with {relay_peer_id}");
436 }
437 RelayClientEvent::ReservationReqFailed {
438 relay_peer_id,
439 error,
440 ..
441 } => {
442 self.reservation_peer.remove(&relay_peer_id);
443 self.candidates.remove(&relay_peer_id);
444 self.blacklist.insert(relay_peer_id, None);
445 log::error!("Reservation request failed {relay_peer_id}: {error}");
446 }
447 e => info!("Relay Client Event: {e:?}"),
448 }
449 }
450}
451
452impl NetworkBehaviour for AutoRelay {
453 type ConnectionHandler = DummyConnectionHandler;
454 type OutEvent = Event;
455
456 fn new_handler(&mut self) -> Self::ConnectionHandler {
457 DummyConnectionHandler
458 }
459
460 fn inject_connection_established(
461 &mut self,
462 peer_id: &PeerId,
463 connection_id: &ConnectionId,
464 endpoint: &ConnectedPoint,
465 _failed_addresses: Option<&Vec<Multiaddr>>,
466 _other_established: usize,
467 ) {
468 if let Entry::Occupied(mut entry) = self.pending_candidates.entry(*peer_id) {
473 if let ConnectedPoint::Dialer { address, .. } = endpoint {
474 let addresses = entry.get_mut();
475
476 let (_, address_without_peer) = extract_peer_id_from_multiaddr(address.clone());
477 if !addresses.contains(&address_without_peer) {
478 return;
479 }
480
481 if let Some(index) = addresses.iter().position(|x| *x == address_without_peer) {
482 addresses.swap_remove(index);
483 if addresses.is_empty() {
484 entry.remove();
485 }
486 }
487
488 self.candidates_connection
489 .insert(*connection_id, address.clone());
490
491 self.candidates
492 .entry(*peer_id)
493 .or_default()
494 .push(address_without_peer.clone());
495
496 self.events
497 .push_back(NetworkBehaviourAction::GenerateEvent(Event::Added {
498 peer_id: *peer_id,
499 addr: vec![address_without_peer],
500 }))
501 }
502 }
503 }
504
505 fn inject_connection_closed(
506 &mut self,
507 peer_id: &PeerId,
508 id: &ConnectionId,
509 _endpoint: &ConnectedPoint,
510 _handler: Self::ConnectionHandler,
511 _remaining_established: usize,
512 ) {
513 if let Entry::Occupied(mut entry) = self.candidates.entry(*peer_id) {
514 let addresses = entry.get_mut();
515
516 if let Some(address) = self.candidates_connection.remove(id) {
517 if let Some(pos) = addresses.iter().position(|a| *a == address) {
518 addresses.swap_remove(pos);
519 }
520
521 if addresses.is_empty() {
524 entry.remove();
525 }
526 }
527 }
528 }
529
530 fn inject_event(&mut self, _peer_id: PeerId, _connection: ConnectionId, _event: void::Void) {}
531
532 fn inject_new_listen_addr(&mut self, id: ListenerId, addr: &Multiaddr) {
533
534 if self.reservation.contains_key(&id) {
535 return;
536 }
537
538 if !addr
539 .iter()
540 .any(|proto| matches!(proto, Protocol::P2pCircuit | Protocol::P2p(_)))
541 {
542 return;
544 }
545
546 let mut addr = addr.clone();
547
548 let Some(Protocol::P2p(_)) = addr.pop() else {
550 return;
551 };
552
553 let Some(Protocol::P2pCircuit) = addr.pop() else {
554 return;
555 };
556
557 let Some(peer_id) = peer_id_from_multiaddr(addr.clone()) else {
558 return;
559 };
560
561 self.pending_reservation_peer.remove(&peer_id);
562 self.reservation.insert(id, addr);
563 self.reservation_peer.insert(peer_id);
564 }
565
566 fn inject_expired_listen_addr(&mut self, _id: ListenerId, _addr: &Multiaddr) {
567 }
569
570 fn inject_listener_closed(&mut self, _id: ListenerId, _reason: Result<(), &std::io::Error>) {
571 }
573
574 fn inject_listener_error(&mut self, _id: ListenerId, _: &(dyn std::error::Error + 'static)) {}
575
576 fn inject_dial_failure(
577 &mut self,
578 peer_id: Option<PeerId>,
579 _handler: Self::ConnectionHandler,
580 error: &DialError,
581 ) {
582 if let Some(peer_id) = peer_id {
583 if let Entry::Occupied(mut entry) = self.pending_candidates.entry(peer_id) {
584 let addresses = entry.get_mut();
585
586 match error {
587 DialError::Transport(multiaddrs) => {
588 for (addr, _) in multiaddrs {
589 let (peer, maddr) = extract_peer_id_from_multiaddr(addr.clone());
590 if let Some(peer) = peer {
591 if peer != peer_id {
592 warn!("PeerId mismatch. {peer} != {peer_id}");
595 }
596 }
597
598 if let Some(pos) = addresses.iter().position(|a| *a == maddr) {
599 addresses.swap_remove(pos);
600 }
601 }
602 }
603 _e => {}
604 }
605
606 if addresses.is_empty() {
607 entry.remove();
608 }
609 }
610 }
611 }
612
613 fn poll(
614 &mut self,
615 cx: &mut Context,
616 _: &mut impl PollParameters,
617 ) -> Poll<swarm::NetworkBehaviourAction<Self::OutEvent, Self::ConnectionHandler>> {
618 if let Some(event) = self.events.pop_front() {
619 return Poll::Ready(event);
620 }
621
622 while let Poll::Ready(Some(_)) = self.interval.poll_next_unpin(cx) {
623 self.select_candidate_low_rtt();
624 }
625
626 Poll::Pending
627 }
628}
629
630pub(crate) fn peer_id_from_multiaddr(addr: Multiaddr) -> Option<PeerId> {
631 let (peer, _) = extract_peer_id_from_multiaddr(addr);
632 peer
633}
634
635#[allow(dead_code)]
636pub(crate) fn extract_peer_id_from_multiaddr(mut addr: Multiaddr) -> (Option<PeerId>, Multiaddr) {
637 match addr.pop() {
638 Some(Protocol::P2p(hash)) => match PeerId::from_multihash(hash) {
639 Ok(id) => (Some(id), addr),
640 _ => (None, addr),
641 },
642 _ => (None, addr),
643 }
644}