1mod handler;
2
3use std::{
4 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
5 error::Error,
6 hash::Hash,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::StreamExt;
12use futures_timer::Delay;
13use libp2p::core::transport::PortUse;
14use libp2p::{
15 core::Endpoint,
16 multiaddr::Protocol,
17 swarm::{
18 derive_prelude::{ConnectionEstablished, ListenerId},
19 dial_opts::DialOpts,
20 AddressChange, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure,
21 ExpiredListenAddr, FromSwarm, ListenOpts, ListenerClosed, ListenerError, NetworkBehaviour,
22 NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23 },
24 Multiaddr, PeerId,
25};
26use rand::prelude::IndexedRandom;
27
28#[derive(Debug)]
29pub enum Event {
30 ReservationSuccessful {
31 peer_id: PeerId,
32 initial_addr: Multiaddr,
33 },
34 ReservationClosed {
35 peer_id: PeerId,
36 result: Result<(), Box<dyn Error + Send>>,
37 },
38 ReservationFailure {
39 peer_id: PeerId,
40 result: Box<dyn Error + Send>,
41 },
42 FindRelays {
43 namespace: Option<String>,
45 channel: futures::channel::mpsc::Sender<HashSet<PeerId>>,
47 },
48}
49
50#[derive(Debug, Clone)]
51struct Connection {
52 pub peer_id: PeerId,
53 pub id: ConnectionId,
54 pub addr: Multiaddr,
55 pub candidacy: Candidate,
56 pub rtt: Option<[Duration; 3]>,
57}
58
59#[derive(Debug, Clone)]
60enum Candidate {
61 Pending,
62 Unsupported,
63 Confirmed {
64 listener_id: Option<ListenerId>,
65 addresses: Vec<Multiaddr>,
66 },
67}
68
69impl PartialEq for Connection {
70 fn eq(&self, other: &Self) -> bool {
71 self.id.eq(&other.id)
72 }
73}
74
75impl Eq for Connection {}
76
77#[allow(dead_code)]
78#[derive(Clone, Debug, PartialEq, Eq)]
79struct PendingReservation {
80 peer_id: PeerId,
81 connection_id: ConnectionId,
82 listener_id: ListenerId,
83}
84
85impl Hash for PendingReservation {
86 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
87 self.listener_id.hash(state)
88 }
89}
90
91#[derive(Debug)]
92#[allow(dead_code)]
93enum ReconnectState {
94 Idle {
95 backoff: bool,
96 delay: Delay,
97 },
98 Pending {
99 connection_id: ConnectionId,
100 backoff: bool,
101 },
102}
103
104#[derive(Default, Debug)]
105#[allow(dead_code)]
106pub struct Behaviour {
107 events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
108 relays: HashMap<PeerId, Vec<Multiaddr>>,
109 connections: HashMap<PeerId, Vec<Connection>>,
110
111 reconnect: HashMap<PeerId, ReconnectState>,
112
113 discovery_channel: HashMap<u64, futures::channel::mpsc::Receiver<HashSet<PeerId>>>,
114
115 pending_connection: HashSet<ConnectionId>,
116 pending_selection: HashSet<PeerId>,
117 config: Config,
118}
119
120#[derive(Debug, Default)]
121pub struct Config {
122 pub auto_relay: bool,
124
125 pub auto_connect: bool,
127
128 pub limit: Option<u64>,
130
131 pub backoff: Duration,
133}
134
135impl Behaviour {
136 pub fn new(config: Config) -> Behaviour {
137 Self {
138 config,
139 events: VecDeque::default(),
140 relays: HashMap::default(),
141 connections: HashMap::default(),
142 reconnect: HashMap::default(),
143 discovery_channel: HashMap::default(),
144 pending_connection: HashSet::default(),
145 pending_selection: HashSet::default(),
146 }
147 }
148
149 pub fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
150 match self.relays.entry(peer_id) {
151 Entry::Vacant(entry) => {
152 entry.insert(vec![addr.clone()]);
153 }
154 Entry::Occupied(mut entry) => {
155 let list = entry.get_mut();
156 if list.contains(&addr) {
157 return;
158 }
159 list.push(addr.clone());
160 }
161 }
162 if self.config.auto_connect {
163 if let Entry::Occupied(entry) = self.connections.entry(peer_id) {
164 if entry.get().iter().any(|connection| connection.addr == addr) {
165 return;
166 }
167 }
168
169 let opts = DialOpts::peer_id(peer_id).build();
170 self.events.push_back(ToSwarm::Dial { opts })
171 }
172 }
173
174 pub fn remove_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
175 if let Entry::Occupied(mut entry) = self.relays.entry(peer_id) {
176 let list = entry.get_mut();
177
178 if let Some(connection) = self.connections.get(&peer_id).and_then(|connections| {
179 connections
180 .iter()
181 .find(|connection| connection.addr.eq(&addr))
182 }) {
183 if let Candidate::Confirmed {
184 listener_id: Some(id),
185 ..
186 } = connection.candidacy
187 {
188 self.events.push_back(ToSwarm::RemoveListener { id });
189 }
190 }
191
192 list.retain(|inner_addr| addr.ne(inner_addr));
193 if list.is_empty() {
194 entry.remove();
195 }
196 }
197 }
198
199 pub fn list_relays(&self) -> impl Iterator<Item = (&PeerId, &Vec<Multiaddr>)> {
200 self.relays.iter()
201 }
202
203 pub fn list_active_relays(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
204 self.connections
205 .iter()
206 .filter(|(_, connections)| {
207 connections.iter().any(|connection| {
208 matches!(
209 connection.candidacy,
210 Candidate::Confirmed {
211 listener_id: Some(_),
212 ..
213 }
214 )
215 })
216 })
217 .map(|(peer_id, connections)| {
218 (
219 *peer_id,
220 connections
221 .iter()
222 .map(|connection| &connection.addr)
223 .cloned()
224 .collect::<Vec<_>>(),
225 )
226 })
227 .collect()
228 }
229
230 #[allow(dead_code)]
231 fn avg_rtt(&self, connection: &Connection) -> u128 {
232 let rtts = connection.rtt.unwrap_or_default();
233 let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
234 let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
236 avg / div
237 }
238
239 pub fn select(&mut self, peer_id: PeerId) {
240 if !self.relays.contains_key(&peer_id) {
241 self.events
242 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
243 peer_id,
244 result: Box::new(std::io::Error::other("Peer is not added in relay list")),
245 }));
246 return;
247 }
248
249 if self.pending_selection.contains(&peer_id) {
250 return;
251 }
252
253 if !self.connections.contains_key(&peer_id) {
254 let opts = DialOpts::peer_id(peer_id).build();
255 let id = opts.connection_id();
256 self.pending_connection.insert(id);
257 self.events.push_back(ToSwarm::Dial { opts });
258 self.pending_selection.insert(peer_id);
259 return;
260 }
261
262 let connections = match self.connections.get_mut(&peer_id) {
263 Some(conns) => conns,
264 None => return,
265 };
266
267 if connections.is_empty() {
268 return;
269 }
270
271 let mut temp_connections = connections.clone();
272 let mut rng = rand::rng();
273 let connection = loop {
274 if temp_connections.is_empty() {
275 self.events
276 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
277 peer_id,
278 result: Box::new(std::io::Error::other(
279 "no qualified connections available",
280 )),
281 }));
282 return;
283 }
284
285 let connection = temp_connections
286 .choose(&mut rng)
287 .cloned()
288 .expect("Connection available");
289
290 if let Candidate::Confirmed {
291 listener_id: Some(_),
292 ..
293 } = connection.candidacy
294 {
295 temp_connections.retain(|c| c.id != connection.id);
298 continue;
299 }
300
301 break connections
302 .iter_mut()
303 .find(|c| c.id == connection.id)
304 .expect("Connection available");
305 };
306
307 if matches!(connection.candidacy, Candidate::Pending) {
308 self.pending_selection.insert(peer_id);
309 return;
310 }
311
312 let relay_addr = connection.addr.clone().with(Protocol::P2pCircuit);
313
314 let opts = ListenOpts::new(relay_addr);
315
316 let id = opts.listener_id();
317
318 if let Candidate::Confirmed { listener_id, .. } = &mut connection.candidacy {
319 *listener_id = Some(id);
320 }
321
322 self.events.push_back(ToSwarm::ListenOn { opts });
323 }
324
325 pub fn random_select(&mut self) -> Option<PeerId> {
326 let relay_peers = self.relays.keys().copied().collect::<Vec<_>>();
327 if relay_peers.is_empty() {
328 return None;
329 }
330
331 let mut rng = rand::rng();
332
333 let peer_id = relay_peers.choose(&mut rng)?;
334
335 self.select(*peer_id);
336
337 Some(*peer_id)
338 }
339
340 pub fn disable_relay(&mut self, peer_id: PeerId) {
341 for connection in self
342 .connections
343 .iter()
344 .filter(|(peer, _)| peer_id == **peer)
345 .flat_map(|(_, connections)| connections)
346 {
347 if let Candidate::Confirmed { .. } = connection.candidacy {
348 let connection = libp2p::swarm::CloseConnection::One(connection.id);
350 self.events.push_back(ToSwarm::CloseConnection {
351 peer_id,
352 connection,
353 });
354 }
355 }
356 }
357
358 pub fn set_peer_rtt(&mut self, peer_id: PeerId, connection_id: ConnectionId, rtt: Duration) {
359 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
360 let connections = entry.get_mut();
361 if let Some(connection) = connections
362 .iter_mut()
363 .find(|connection| connection.id == connection_id)
364 {
365 match connection.rtt.as_mut() {
366 Some(connection_rtt) => {
367 connection_rtt.rotate_left(1);
368 connection_rtt[2] = rtt;
369 }
370 None => connection.rtt = Some([Duration::ZERO, Duration::ZERO, rtt]),
371 }
372 }
373 }
374 }
375
376 fn on_listen_on(
377 &mut self,
378 NewListenAddr {
379 listener_id,
380 addr: direct_addr,
381 }: NewListenAddr,
382 ) {
383 if !direct_addr
384 .iter()
385 .any(|proto| matches!(proto, Protocol::P2pCircuit))
386 {
387 return;
388 }
389
390 for connection in self
391 .connections
392 .values_mut()
393 .flatten()
394 .filter(|connection| {
395 if let Candidate::Confirmed {
396 listener_id: Some(id),
397 ..
398 } = connection.candidacy
399 {
400 id == listener_id
401 } else {
402 false
403 }
404 })
405 {
406 match &mut connection.candidacy {
407 Candidate::Confirmed {
408 listener_id: id,
409 addresses,
410 } => {
411 *id = Some(listener_id);
412 let first = addresses.is_empty();
413 if !addresses.contains(direct_addr) {
414 addresses.push(direct_addr.clone());
415 if first {
416 self.events.push_back(ToSwarm::GenerateEvent(
417 Event::ReservationSuccessful {
418 peer_id: connection.peer_id,
419 initial_addr: direct_addr.clone(),
420 },
421 ))
422 }
423 }
424 }
425 Candidate::Pending | Candidate::Unsupported => {
426 }
428 };
429 }
430 }
431
432 fn on_listener_close(
433 &mut self,
434 ListenerClosed {
435 listener_id,
436 reason,
437 }: ListenerClosed,
438 ) {
439 let Some(connection) =
440 self.connections
441 .values_mut()
442 .flatten()
443 .find(|connection| match connection.candidacy {
444 Candidate::Confirmed {
445 listener_id: Some(id),
446 ..
447 } => id == listener_id,
448 _ => false,
449 })
450 else {
451 return;
452 };
453
454 if let Candidate::Confirmed {
455 listener_id,
456 addresses,
457 } = &mut connection.candidacy
458 {
459 listener_id.take();
460 let addrs = std::mem::take(addresses);
461 let has_addresses = addrs.is_empty();
462
463 for addr in addrs {
464 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
465 }
466
467 match (has_addresses, reason) {
468 (true, result) => {
469 self.events
470 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
471 peer_id: connection.peer_id,
472 result: result
473 .map_err(|e| std::io::Error::new(e.kind(), e.to_string()))
474 .map_err(|e| Box::new(e) as Box<_>),
475 }))
476 }
477 (false, Err(e)) => {
478 self.events
479 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
480 peer_id: connection.peer_id,
481 result: Box::new(std::io::Error::new(e.kind(), e.to_string())),
482 }))
483 }
484 _ => {}
485 }
486 }
487 }
488
489 fn on_listener_error(&mut self, ListenerError { listener_id, err }: ListenerError) {
490 let Some(connection) =
491 self.connections
492 .values_mut()
493 .flatten()
494 .find(|connection| match connection.candidacy {
495 Candidate::Confirmed {
496 listener_id: Some(id),
497 ..
498 } => id == listener_id,
499 _ => false,
500 })
501 else {
502 return;
503 };
504
505 if let Candidate::Confirmed {
506 listener_id,
507 addresses,
508 } = &mut connection.candidacy
509 {
510 listener_id.take();
511 let addrs = std::mem::take(addresses);
512 for addr in addrs {
513 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
514 }
515 self.events
516 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
517 peer_id: connection.peer_id,
518 result: Box::new(std::io::Error::other(err.to_string())),
519 }))
520 }
521 }
522
523 fn on_listener_expired(&mut self, ExpiredListenAddr { listener_id, addr }: ExpiredListenAddr) {
524 let Some(connection) =
525 self.connections
526 .values_mut()
527 .flatten()
528 .find(|connection| match connection.candidacy {
529 Candidate::Confirmed {
530 listener_id: Some(id),
531 ..
532 } => id == listener_id,
533 _ => false,
534 })
535 else {
536 return;
537 };
538
539 if let Candidate::Confirmed { addresses, .. } = &mut connection.candidacy {
540 if !addresses.contains(addr) {
541 return;
542 }
543
544 addresses.retain(|a| a != addr);
545
546 self.events
547 .push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
548 }
549 }
550
551 fn on_address_change(
552 &mut self,
553 AddressChange {
554 peer_id,
555 connection_id,
556 old,
557 new,
558 }: AddressChange,
559 ) {
560 let Some(connections) = self.connections.get_mut(&peer_id) else {
561 return;
562 };
563
564 let Some(connection) = connections
565 .iter_mut()
566 .find(|connection| connection.id == connection_id)
567 else {
568 return;
569 };
570
571 let old_addr = match old {
572 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
573 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
574 };
575
576 let new_addr = match new {
577 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
578 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
579 };
580
581 if old_addr == new_addr {
582 return;
583 }
584
585 connection.addr = new_addr.clone();
586 }
587
588 fn on_dial_failure(
589 &mut self,
590 DialFailure {
591 peer_id,
592 error,
593 connection_id,
594 }: DialFailure,
595 ) {
596 if !self.pending_connection.remove(&connection_id) {
597 return;
598 }
599
600 let Some(peer_id) = peer_id else {
601 return;
602 };
603
604 self.events
605 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
606 peer_id,
607 result: Box::new(std::io::Error::other(error.to_string())),
608 }));
609
610 }
630
631 fn on_connection_established(
632 &mut self,
633 ConnectionEstablished {
634 peer_id,
635 connection_id,
636 endpoint,
637 ..
638 }: ConnectionEstablished,
639 ) {
640 self.pending_connection.remove(&connection_id);
641
642 let addr = match endpoint {
643 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
644 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
645 };
646
647 match self.relays.entry(peer_id) {
648 Entry::Occupied(entry) => {
649 let mut addr = addr.clone();
650 addr.pop();
651
652 if !entry.get().contains(&addr) {
653 return;
654 }
655 }
656 Entry::Vacant(_) if self.config.auto_connect => {}
657 _ => return,
658 };
659
660 let connection = Connection {
661 peer_id,
662 id: connection_id,
663 addr: addr.clone(),
664 candidacy: Candidate::Pending,
665 rtt: None,
666 };
667
668 self.connections
669 .entry(peer_id)
670 .or_default()
671 .push(connection);
672
673 if self.pending_selection.remove(&peer_id) {
674 self.select(peer_id);
675 }
676 }
677
678 fn on_connection_closed(
679 &mut self,
680 ConnectionClosed {
681 peer_id,
682 connection_id,
683 ..
684 }: ConnectionClosed<'_>,
685 ) {
686 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
687 let connections = entry.get_mut();
688 let Some(connection) = connections
689 .iter_mut()
690 .find(|connection| connection.id == connection_id)
691 else {
692 return;
693 };
694
695 if let Candidate::Confirmed {
699 listener_id,
700 addresses,
701 } = &mut connection.candidacy
702 {
703 if let Some(listener_id) = listener_id.take() {
704 let addrs = std::mem::take(addresses);
705 for addr in addrs {
706 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
707 }
708 self.events
709 .push_back(ToSwarm::RemoveListener { id: listener_id });
710
711 self.events
712 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
713 peer_id: connection.peer_id,
714 result: Ok(()),
715 }))
716 }
717 }
718
719 connections.retain(|connection| connection.id != connection_id);
720
721 if connections.is_empty() {
722 entry.remove();
723 }
724 }
725 }
726
727 pub fn process_relay_event(&mut self, _: libp2p::relay::client::Event) {
728 }
739}
740
741impl NetworkBehaviour for Behaviour {
742 type ToSwarm = Event;
743 type ConnectionHandler = handler::Handler;
744
745 fn handle_established_inbound_connection(
746 &mut self,
747 _connection_id: ConnectionId,
748 _peer: PeerId,
749 _local_addr: &Multiaddr,
750 _remote_addr: &Multiaddr,
751 ) -> Result<THandler<Self>, ConnectionDenied> {
752 Ok(handler::Handler::default())
753 }
754
755 fn handle_established_outbound_connection(
756 &mut self,
757 _connection_id: ConnectionId,
758 _peer: PeerId,
759 _addr: &Multiaddr,
760 _role_override: Endpoint,
761 _: PortUse,
762 ) -> Result<THandler<Self>, ConnectionDenied> {
763 Ok(handler::Handler::default())
764 }
765
766 fn handle_pending_outbound_connection(
767 &mut self,
768 _: ConnectionId,
769 maybe_peer: Option<PeerId>,
770 _: &[Multiaddr],
771 _: Endpoint,
772 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
773 let addrs = maybe_peer
774 .and_then(|peer_id| self.relays.get(&peer_id))
775 .cloned()
776 .unwrap_or_default();
777
778 Ok(addrs)
779 }
780
781 fn on_swarm_event(&mut self, event: FromSwarm) {
782 match event {
783 FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
784 FromSwarm::ConnectionClosed(event) => self.on_connection_closed(event),
785 FromSwarm::NewListenAddr(event) => self.on_listen_on(event),
786 FromSwarm::ListenerClosed(event) => self.on_listener_close(event),
787 FromSwarm::DialFailure(event) => self.on_dial_failure(event),
788 FromSwarm::ListenerError(event) => self.on_listener_error(event),
789 FromSwarm::ExpiredListenAddr(event) => self.on_listener_expired(event),
790 FromSwarm::AddressChange(event) => self.on_address_change(event),
791 _ => {}
792 }
793 }
794
795 fn on_connection_handler_event(
796 &mut self,
797 peer_id: PeerId,
798 connection_id: ConnectionId,
799 event: THandlerOutEvent<Self>,
800 ) {
801 match event {
802 handler::Out::Supported => {
803 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
804 let list = entry.get_mut();
805 if let Some(connection) = list
806 .iter_mut()
807 .find(|connection| connection.id == connection_id)
808 {
809 let canadate_state = &mut connection.candidacy;
810
811 if matches!(canadate_state, Candidate::Pending | Candidate::Unsupported) {
812 *canadate_state = Candidate::Confirmed {
813 listener_id: None,
814 addresses: vec![],
815 };
816 if self.pending_selection.remove(&peer_id) {
817 self.select(peer_id);
818 }
819 }
820 }
821 }
822 }
823 handler::Out::Unsupported => {
824 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
825 let list = entry.get_mut();
826 if let Some(connection) = list
827 .iter_mut()
828 .find(|connection| connection.id == connection_id)
829 {
830 let canadate_state = &mut connection.candidacy;
831
832 if let Candidate::Confirmed {
833 listener_id: Some(id),
834 ..
835 } = canadate_state
836 {
837 let id = *id;
838 self.events.push_back(ToSwarm::RemoveListener { id });
839 }
840
841 *canadate_state = Candidate::Unsupported;
842 self.pending_selection.remove(&peer_id);
843 }
844 }
845 }
846 }
847 }
848
849 fn poll(
850 &mut self,
851 cx: &mut Context<'_>,
852 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
853 if let Some(event) = self.events.pop_front() {
854 return Poll::Ready(event);
855 }
856
857 self.discovery_channel
858 .retain(|_, rx| match rx.poll_next_unpin(cx) {
859 Poll::Ready(Some(list)) => {
860 for peer_id in list {
861 self.relays.entry(peer_id).or_default();
862 }
863 false
864 }
865 Poll::Ready(None) => false,
866 Poll::Pending => true,
867 });
868
869 Poll::Pending
870 }
871}