1mod handler;
2
3use std::{
4 collections::{hash_map::Entry, HashMap, HashSet, VecDeque},
5 error::Error,
6 hash::Hash,
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::StreamExt;
12use futures_timer::Delay;
13use libp2p::core::transport::PortUse;
14use libp2p::{
15 core::Endpoint,
16 multiaddr::Protocol,
17 swarm::{
18 derive_prelude::{ConnectionEstablished, ListenerId},
19 dial_opts::DialOpts,
20 AddressChange, ConnectionClosed, ConnectionDenied, ConnectionId, DialFailure,
21 ExpiredListenAddr, FromSwarm, ListenOpts, ListenerClosed, ListenerError, NetworkBehaviour,
22 NewListenAddr, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
23 },
24 Multiaddr, PeerId,
25};
26use rand::seq::SliceRandom;
27
28#[derive(Debug)]
29pub enum Event {
30 ReservationSuccessful {
31 peer_id: PeerId,
32 initial_addr: Multiaddr,
33 },
34 ReservationClosed {
35 peer_id: PeerId,
36 result: Result<(), Box<dyn Error + Send>>,
37 },
38 ReservationFailure {
39 peer_id: PeerId,
40 result: Box<dyn Error + Send>,
41 },
42 FindRelays {
43 namespace: Option<String>,
45 channel: futures::channel::mpsc::Sender<HashSet<PeerId>>,
47 },
48}
49
50#[derive(Debug, Clone)]
51struct Connection {
52 pub peer_id: PeerId,
53 pub id: ConnectionId,
54 pub addr: Multiaddr,
55 pub candidacy: Candidate,
56 pub rtt: Option<[Duration; 3]>,
57}
58
59#[derive(Debug, Clone)]
60enum Candidate {
61 Pending,
62 Unsupported,
63 Confirmed {
64 listener_id: Option<ListenerId>,
65 addresses: Vec<Multiaddr>,
66 },
67}
68
69impl PartialEq for Connection {
70 fn eq(&self, other: &Self) -> bool {
71 self.id.eq(&other.id)
72 }
73}
74
75impl Eq for Connection {}
76
77#[allow(dead_code)]
78#[derive(Clone, Debug, PartialEq, Eq)]
79struct PendingReservation {
80 peer_id: PeerId,
81 connection_id: ConnectionId,
82 listener_id: ListenerId,
83}
84
85impl Hash for PendingReservation {
86 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
87 self.listener_id.hash(state)
88 }
89}
90
91#[derive(Debug)]
92#[allow(dead_code)]
93enum ReconnectState {
94 Idle {
95 backoff: bool,
96 delay: Delay,
97 },
98 Pending {
99 connection_id: ConnectionId,
100 backoff: bool,
101 },
102}
103
104#[derive(Default, Debug)]
105#[allow(dead_code)]
106pub struct Behaviour {
107 events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
108 relays: HashMap<PeerId, Vec<Multiaddr>>,
109 connections: HashMap<PeerId, Vec<Connection>>,
110
111 reconnect: HashMap<PeerId, ReconnectState>,
112
113 discovery_channel: HashMap<u64, futures::channel::mpsc::Receiver<HashSet<PeerId>>>,
114
115 pending_connection: HashSet<ConnectionId>,
116 pending_selection: HashSet<PeerId>,
117 config: Config,
118}
119
120#[derive(Debug, Default)]
121pub struct Config {
122 pub auto_relay: bool,
124
125 pub auto_connect: bool,
127
128 pub limit: Option<u64>,
130
131 pub backoff: Duration,
133}
134
135impl Behaviour {
136 pub fn new(config: Config) -> Behaviour {
137 Self {
138 config,
139 events: VecDeque::default(),
140 relays: HashMap::default(),
141 connections: HashMap::default(),
142 reconnect: HashMap::default(),
143 discovery_channel: HashMap::default(),
144 pending_connection: HashSet::default(),
145 pending_selection: HashSet::default(),
146 }
147 }
148
149 pub fn add_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
150 match self.relays.entry(peer_id) {
151 Entry::Vacant(entry) => {
152 entry.insert(vec![addr.clone()]);
153 }
154 Entry::Occupied(mut entry) => {
155 let list = entry.get_mut();
156 if list.contains(&addr) {
157 return;
158 }
159 list.push(addr.clone());
160 }
161 }
162 if self.config.auto_connect {
163 if let Entry::Occupied(entry) = self.connections.entry(peer_id) {
164 if entry.get().iter().any(|connection| connection.addr == addr) {
165 return;
166 }
167 }
168
169 let opts = DialOpts::peer_id(peer_id).build();
170 self.events.push_back(ToSwarm::Dial { opts })
171 }
172 }
173
174 pub fn remove_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
175 if let Entry::Occupied(mut entry) = self.relays.entry(peer_id) {
176 let list = entry.get_mut();
177
178 if let Some(connection) = self.connections.get(&peer_id).and_then(|connections| {
179 connections
180 .iter()
181 .find(|connection| connection.addr.eq(&addr))
182 }) {
183 if let Candidate::Confirmed {
184 listener_id: Some(id),
185 ..
186 } = connection.candidacy
187 {
188 self.events.push_back(ToSwarm::RemoveListener { id });
189 }
190 }
191
192 list.retain(|inner_addr| addr.ne(inner_addr));
193 if list.is_empty() {
194 entry.remove();
195 }
196 }
197 }
198
199 pub fn list_relays(&self) -> impl Iterator<Item = (&PeerId, &Vec<Multiaddr>)> {
200 self.relays.iter()
201 }
202
203 pub fn list_active_relays(&self) -> Vec<(PeerId, Vec<Multiaddr>)> {
204 self.connections
205 .iter()
206 .filter(|(_, connections)| {
207 connections.iter().any(|connection| {
208 matches!(
209 connection.candidacy,
210 Candidate::Confirmed {
211 listener_id: Some(_),
212 ..
213 }
214 )
215 })
216 })
217 .map(|(peer_id, connections)| {
218 (
219 *peer_id,
220 connections
221 .iter()
222 .map(|connection| &connection.addr)
223 .cloned()
224 .collect::<Vec<_>>(),
225 )
226 })
227 .collect()
228 }
229
230 #[allow(dead_code)]
231 fn avg_rtt(&self, connection: &Connection) -> u128 {
232 let rtts = connection.rtt.unwrap_or_default();
233 let avg: u128 = rtts.iter().map(|duration| duration.as_millis()).sum();
234 let div = rtts.iter().filter(|i| !i.is_zero()).count() as u128;
236 avg / div
237 }
238
239 pub fn select(&mut self, peer_id: PeerId) {
240 if !self.relays.contains_key(&peer_id) {
241 self.events
242 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
243 peer_id,
244 result: Box::new(std::io::Error::new(
245 std::io::ErrorKind::Other,
246 "Peer is not added in relay list",
247 )),
248 }));
249 return;
250 }
251
252 if self.pending_selection.contains(&peer_id) {
253 return;
254 }
255
256 if !self.connections.contains_key(&peer_id) {
257 let opts = DialOpts::peer_id(peer_id).build();
258 let id = opts.connection_id();
259 self.pending_connection.insert(id);
260 self.events.push_back(ToSwarm::Dial { opts });
261 self.pending_selection.insert(peer_id);
262 return;
263 }
264
265 let connections = match self.connections.get_mut(&peer_id) {
266 Some(conns) => conns,
267 None => return,
268 };
269
270 if connections.is_empty() {
271 return;
272 }
273
274 let mut temp_connections = connections.clone();
275 let mut rng = rand::thread_rng();
276 let connection = loop {
277 if temp_connections.is_empty() {
278 self.events
279 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
280 peer_id,
281 result: Box::new(std::io::Error::new(
282 std::io::ErrorKind::Other,
283 "no qualified connections available",
284 )),
285 }));
286 return;
287 }
288
289 let connection = temp_connections
290 .choose(&mut rng)
291 .cloned()
292 .expect("Connection available");
293
294 if let Candidate::Confirmed {
295 listener_id: Some(_),
296 ..
297 } = connection.candidacy
298 {
299 temp_connections.retain(|c| c.id != connection.id);
302 continue;
303 }
304
305 break connections
306 .iter_mut()
307 .find(|c| c.id == connection.id)
308 .expect("Connection available");
309 };
310
311 if matches!(connection.candidacy, Candidate::Pending) {
312 self.pending_selection.insert(peer_id);
313 return;
314 }
315
316 let relay_addr = connection.addr.clone().with(Protocol::P2pCircuit);
317
318 let opts = ListenOpts::new(relay_addr);
319
320 let id = opts.listener_id();
321
322 if let Candidate::Confirmed { listener_id, .. } = &mut connection.candidacy {
323 *listener_id = Some(id);
324 }
325
326 self.events.push_back(ToSwarm::ListenOn { opts });
327 }
328
329 pub fn random_select(&mut self) -> Option<PeerId> {
330 let relay_peers = self.relays.keys().copied().collect::<Vec<_>>();
331 if relay_peers.is_empty() {
332 return None;
333 }
334
335 let mut rng = rand::thread_rng();
336
337 let peer_id = relay_peers.choose(&mut rng)?;
338
339 self.select(*peer_id);
340
341 Some(*peer_id)
342 }
343
344 pub fn disable_relay(&mut self, peer_id: PeerId) {
345 for connection in self
346 .connections
347 .iter()
348 .filter(|(peer, _)| peer_id == **peer)
349 .flat_map(|(_, connections)| connections)
350 {
351 if let Candidate::Confirmed { .. } = connection.candidacy {
352 let connection = libp2p::swarm::CloseConnection::One(connection.id);
354 self.events.push_back(ToSwarm::CloseConnection {
355 peer_id,
356 connection,
357 });
358 }
359 }
360 }
361
362 pub fn set_peer_rtt(&mut self, peer_id: PeerId, connection_id: ConnectionId, rtt: Duration) {
363 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
364 let connections = entry.get_mut();
365 if let Some(connection) = connections
366 .iter_mut()
367 .find(|connection| connection.id == connection_id)
368 {
369 match connection.rtt.as_mut() {
370 Some(connection_rtt) => {
371 connection_rtt.rotate_left(1);
372 connection_rtt[2] = rtt;
373 }
374 None => connection.rtt = Some([Duration::ZERO, Duration::ZERO, rtt]),
375 }
376 }
377 }
378 }
379
380 fn on_listen_on(
381 &mut self,
382 NewListenAddr {
383 listener_id,
384 addr: direct_addr,
385 }: NewListenAddr,
386 ) {
387 if !direct_addr
388 .iter()
389 .any(|proto| matches!(proto, Protocol::P2pCircuit))
390 {
391 return;
392 }
393
394 for connection in self
395 .connections
396 .values_mut()
397 .flatten()
398 .filter(|connection| {
399 if let Candidate::Confirmed {
400 listener_id: Some(id),
401 ..
402 } = connection.candidacy
403 {
404 id == listener_id
405 } else {
406 false
407 }
408 })
409 {
410 match &mut connection.candidacy {
411 Candidate::Confirmed {
412 listener_id: id,
413 addresses,
414 } => {
415 *id = Some(listener_id);
416 let first = addresses.is_empty();
417 if !addresses.contains(direct_addr) {
418 addresses.push(direct_addr.clone());
419 if first {
420 self.events.push_back(ToSwarm::GenerateEvent(
421 Event::ReservationSuccessful {
422 peer_id: connection.peer_id,
423 initial_addr: direct_addr.clone(),
424 },
425 ))
426 }
427 }
428 }
429 Candidate::Pending | Candidate::Unsupported => {
430 }
432 };
433 }
434 }
435
436 fn on_listener_close(
437 &mut self,
438 ListenerClosed {
439 listener_id,
440 reason,
441 }: ListenerClosed,
442 ) {
443 let Some(connection) =
444 self.connections
445 .values_mut()
446 .flatten()
447 .find(|connection| match connection.candidacy {
448 Candidate::Confirmed {
449 listener_id: Some(id),
450 ..
451 } => id == listener_id,
452 _ => false,
453 })
454 else {
455 return;
456 };
457
458 if let Candidate::Confirmed {
459 listener_id,
460 addresses,
461 } = &mut connection.candidacy
462 {
463 listener_id.take();
464 let addrs = std::mem::take(addresses);
465 let has_addresses = addrs.is_empty();
466
467 for addr in addrs {
468 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
469 }
470
471 match (has_addresses, reason) {
472 (true, result) => {
473 self.events
474 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
475 peer_id: connection.peer_id,
476 result: result
477 .map_err(|e| std::io::Error::new(e.kind(), e.to_string()))
478 .map_err(|e| Box::new(e) as Box<_>),
479 }))
480 }
481 (false, Err(e)) => {
482 self.events
483 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
484 peer_id: connection.peer_id,
485 result: Box::new(std::io::Error::new(e.kind(), e.to_string())),
486 }))
487 }
488 _ => {}
489 }
490 }
491 }
492
493 fn on_listener_error(&mut self, ListenerError { listener_id, err }: ListenerError) {
494 let Some(connection) =
495 self.connections
496 .values_mut()
497 .flatten()
498 .find(|connection| match connection.candidacy {
499 Candidate::Confirmed {
500 listener_id: Some(id),
501 ..
502 } => id == listener_id,
503 _ => false,
504 })
505 else {
506 return;
507 };
508
509 if let Candidate::Confirmed {
510 listener_id,
511 addresses,
512 } = &mut connection.candidacy
513 {
514 listener_id.take();
515 let addrs = std::mem::take(addresses);
516 for addr in addrs {
517 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
518 }
519 self.events
520 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
521 peer_id: connection.peer_id,
522 result: Box::new(std::io::Error::new(
523 std::io::ErrorKind::Other,
524 err.to_string(),
525 )),
526 }))
527 }
528 }
529
530 fn on_listener_expired(&mut self, ExpiredListenAddr { listener_id, addr }: ExpiredListenAddr) {
531 let Some(connection) =
532 self.connections
533 .values_mut()
534 .flatten()
535 .find(|connection| match connection.candidacy {
536 Candidate::Confirmed {
537 listener_id: Some(id),
538 ..
539 } => id == listener_id,
540 _ => false,
541 })
542 else {
543 return;
544 };
545
546 if let Candidate::Confirmed { addresses, .. } = &mut connection.candidacy {
547 if !addresses.contains(addr) {
548 return;
549 }
550
551 addresses.retain(|a| a != addr);
552
553 self.events
554 .push_back(ToSwarm::ExternalAddrExpired(addr.clone()));
555 }
556 }
557
558 fn on_address_change(
559 &mut self,
560 AddressChange {
561 peer_id,
562 connection_id,
563 old,
564 new,
565 }: AddressChange,
566 ) {
567 let Some(connections) = self.connections.get_mut(&peer_id) else {
568 return;
569 };
570
571 let Some(connection) = connections
572 .iter_mut()
573 .find(|connection| connection.id == connection_id)
574 else {
575 return;
576 };
577
578 let old_addr = match old {
579 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
580 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
581 };
582
583 let new_addr = match new {
584 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
585 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
586 };
587
588 if old_addr == new_addr {
589 return;
590 }
591
592 connection.addr = new_addr.clone();
593 }
594
595 fn on_dial_failure(
596 &mut self,
597 DialFailure {
598 peer_id,
599 error,
600 connection_id,
601 }: DialFailure,
602 ) {
603 if !self.pending_connection.remove(&connection_id) {
604 return;
605 }
606
607 let Some(peer_id) = peer_id else {
608 return;
609 };
610
611 self.events
612 .push_back(ToSwarm::GenerateEvent(Event::ReservationFailure {
613 peer_id,
614 result: Box::new(std::io::Error::new(
615 std::io::ErrorKind::Other,
616 error.to_string(),
617 )),
618 }));
619
620 }
640
641 fn on_connection_established(
642 &mut self,
643 ConnectionEstablished {
644 peer_id,
645 connection_id,
646 endpoint,
647 ..
648 }: ConnectionEstablished,
649 ) {
650 self.pending_connection.remove(&connection_id);
651
652 let addr = match endpoint {
653 libp2p::core::ConnectedPoint::Dialer { address, .. } => address,
654 libp2p::core::ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
655 };
656
657 match self.relays.entry(peer_id) {
658 Entry::Occupied(entry) => {
659 let mut addr = addr.clone();
660 addr.pop();
661
662 if !entry.get().contains(&addr) {
663 return;
664 }
665 }
666 Entry::Vacant(_) if self.config.auto_connect => {}
667 _ => return,
668 };
669
670 let connection = Connection {
671 peer_id,
672 id: connection_id,
673 addr: addr.clone(),
674 candidacy: Candidate::Pending,
675 rtt: None,
676 };
677
678 self.connections
679 .entry(peer_id)
680 .or_default()
681 .push(connection);
682
683 if self.pending_selection.remove(&peer_id) {
684 self.select(peer_id);
685 }
686 }
687
688 fn on_connection_closed(
689 &mut self,
690 ConnectionClosed {
691 peer_id,
692 connection_id,
693 ..
694 }: ConnectionClosed<'_>,
695 ) {
696 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
697 let connections = entry.get_mut();
698 let Some(connection) = connections
699 .iter_mut()
700 .find(|connection| connection.id == connection_id)
701 else {
702 return;
703 };
704
705 if let Candidate::Confirmed {
709 listener_id,
710 addresses,
711 } = &mut connection.candidacy
712 {
713 if let Some(listener_id) = listener_id.take() {
714 let addrs = std::mem::take(addresses);
715 for addr in addrs {
716 self.events.push_back(ToSwarm::ExternalAddrExpired(addr));
717 }
718 self.events
719 .push_back(ToSwarm::RemoveListener { id: listener_id });
720
721 self.events
722 .push_back(ToSwarm::GenerateEvent(Event::ReservationClosed {
723 peer_id: connection.peer_id,
724 result: Ok(()),
725 }))
726 }
727 }
728
729 connections.retain(|connection| connection.id != connection_id);
730
731 if connections.is_empty() {
732 entry.remove();
733 }
734 }
735 }
736
737 pub fn process_relay_event(&mut self, _: libp2p::relay::client::Event) {
738 }
749}
750
751impl NetworkBehaviour for Behaviour {
752 type ToSwarm = Event;
753 type ConnectionHandler = handler::Handler;
754
755 fn handle_established_inbound_connection(
756 &mut self,
757 _connection_id: ConnectionId,
758 _peer: PeerId,
759 _local_addr: &Multiaddr,
760 _remote_addr: &Multiaddr,
761 ) -> Result<THandler<Self>, ConnectionDenied> {
762 Ok(handler::Handler::default())
763 }
764
765 fn handle_established_outbound_connection(
766 &mut self,
767 _connection_id: ConnectionId,
768 _peer: PeerId,
769 _addr: &Multiaddr,
770 _role_override: Endpoint,
771 _: PortUse,
772 ) -> Result<THandler<Self>, ConnectionDenied> {
773 Ok(handler::Handler::default())
774 }
775
776 fn handle_pending_outbound_connection(
777 &mut self,
778 _: ConnectionId,
779 maybe_peer: Option<PeerId>,
780 _: &[Multiaddr],
781 _: Endpoint,
782 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
783 let addrs = maybe_peer
784 .and_then(|peer_id| self.relays.get(&peer_id))
785 .cloned()
786 .unwrap_or_default();
787
788 Ok(addrs)
789 }
790
791 fn on_swarm_event(&mut self, event: FromSwarm) {
792 match event {
793 FromSwarm::ConnectionEstablished(event) => self.on_connection_established(event),
794 FromSwarm::ConnectionClosed(event) => self.on_connection_closed(event),
795 FromSwarm::NewListenAddr(event) => self.on_listen_on(event),
796 FromSwarm::ListenerClosed(event) => self.on_listener_close(event),
797 FromSwarm::DialFailure(event) => self.on_dial_failure(event),
798 FromSwarm::ListenerError(event) => self.on_listener_error(event),
799 FromSwarm::ExpiredListenAddr(event) => self.on_listener_expired(event),
800 FromSwarm::AddressChange(event) => self.on_address_change(event),
801 _ => {}
802 }
803 }
804
805 fn on_connection_handler_event(
806 &mut self,
807 peer_id: PeerId,
808 connection_id: ConnectionId,
809 event: THandlerOutEvent<Self>,
810 ) {
811 match event {
812 handler::Out::Supported => {
813 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
814 let list = entry.get_mut();
815 if let Some(connection) = list
816 .iter_mut()
817 .find(|connection| connection.id == connection_id)
818 {
819 let canadate_state = &mut connection.candidacy;
820
821 if matches!(canadate_state, Candidate::Pending | Candidate::Unsupported) {
822 *canadate_state = Candidate::Confirmed {
823 listener_id: None,
824 addresses: vec![],
825 };
826 if self.pending_selection.remove(&peer_id) {
827 self.select(peer_id);
828 }
829 }
830 }
831 }
832 }
833 handler::Out::Unsupported => {
834 if let Entry::Occupied(mut entry) = self.connections.entry(peer_id) {
835 let list = entry.get_mut();
836 if let Some(connection) = list
837 .iter_mut()
838 .find(|connection| connection.id == connection_id)
839 {
840 let canadate_state = &mut connection.candidacy;
841
842 if let Candidate::Confirmed {
843 listener_id: Some(id),
844 ..
845 } = canadate_state
846 {
847 let id = *id;
848 self.events.push_back(ToSwarm::RemoveListener { id });
849 }
850
851 *canadate_state = Candidate::Unsupported;
852 self.pending_selection.remove(&peer_id);
853 }
854 }
855 }
856 }
857 }
858
859 fn poll(
860 &mut self,
861 cx: &mut Context<'_>,
862 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
863 if let Some(event) = self.events.pop_front() {
864 return Poll::Ready(event);
865 }
866
867 self.discovery_channel
868 .retain(|_, rx| match rx.poll_next_unpin(cx) {
869 Poll::Ready(Some(list)) => {
870 for peer_id in list {
871 self.relays.entry(peer_id).or_default();
872 }
873 false
874 }
875 Poll::Ready(None) => false,
876 Poll::Pending => true,
877 });
878
879 Poll::Pending
880 }
881}