rust_ipfs/p2p/
peerbook.rs

1use core::task::{Context, Poll};
2use libp2p::core::{Endpoint, Multiaddr};
3use libp2p::identify::Info;
4use libp2p::swarm::derive_prelude::ConnectionEstablished;
5use libp2p::swarm::{self, dummy::ConnectionHandler as DummyConnectionHandler, NetworkBehaviour};
6use libp2p::swarm::{
7    ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, THandler, THandlerInEvent, ToSwarm,
8};
9use libp2p::PeerId;
10use std::collections::hash_map::Entry;
11use std::time::Duration;
12
13use libp2p::core::transport::PortUse;
14use std::collections::{HashMap, VecDeque};
15
16#[derive(Default, Debug)]
17pub struct Behaviour {
18    events: VecDeque<ToSwarm<<Self as NetworkBehaviour>::ToSwarm, THandlerInEvent<Self>>>,
19    peer_info: HashMap<PeerId, Info>,
20    peer_rtt: HashMap<PeerId, [Duration; 3]>,
21    peer_connections: HashMap<PeerId, Vec<(ConnectionId, Multiaddr)>>,
22}
23
24impl Behaviour {
25    pub fn inject_peer_info(&mut self, info: Info) {
26        let peer_id = info.public_key.to_peer_id();
27        self.peer_info.insert(peer_id, info);
28    }
29
30    pub fn peers(&self) -> impl Iterator<Item = &PeerId> {
31        self.peer_connections.keys()
32    }
33
34    pub fn connected_peers_addrs(&self) -> impl Iterator<Item = (PeerId, Vec<Multiaddr>)> + '_ {
35        self.peer_connections.iter().map(|(peer_id, list)| {
36            let list = list
37                .iter()
38                .map(|(_, addr)| addr)
39                .cloned()
40                .collect::<Vec<_>>();
41            (*peer_id, list)
42        })
43    }
44
45    pub fn set_peer_rtt(&mut self, peer_id: PeerId, rtt: Duration) {
46        self.peer_rtt
47            .entry(peer_id)
48            .and_modify(|r| {
49                r.rotate_left(1);
50                r[2] = rtt;
51            })
52            .or_insert([Duration::from_millis(0), Duration::from_millis(0), rtt]);
53    }
54
55    pub fn get_peer_rtt(&self, peer_id: PeerId) -> Option<[Duration; 3]> {
56        self.peer_rtt.get(&peer_id).copied()
57    }
58
59    pub fn get_peer_latest_rtt(&self, peer_id: PeerId) -> Option<Duration> {
60        self.get_peer_rtt(peer_id).map(|rtt| rtt[2])
61    }
62
63    pub fn get_peer_info(&self, peer_id: PeerId) -> Option<&Info> {
64        self.peer_info.get(&peer_id)
65    }
66
67    pub fn remove_peer_info(&mut self, peer_id: PeerId) {
68        self.peer_info.remove(&peer_id);
69    }
70
71    pub fn peer_connections(&self, peer_id: PeerId) -> Option<Vec<Multiaddr>> {
72        self.peer_connections
73            .get(&peer_id)
74            .map(|list| list.iter().map(|(_, addr)| addr).cloned().collect())
75    }
76}
77
78impl NetworkBehaviour for Behaviour {
79    type ConnectionHandler = DummyConnectionHandler;
80    type ToSwarm = void::Void;
81
82    fn handle_pending_inbound_connection(
83        &mut self,
84        _: ConnectionId,
85        _: &Multiaddr,
86        _: &Multiaddr,
87    ) -> Result<(), ConnectionDenied> {
88        Ok(())
89    }
90
91    fn handle_pending_outbound_connection(
92        &mut self,
93        _: ConnectionId,
94        _: Option<PeerId>,
95        _: &[Multiaddr],
96        _: Endpoint,
97    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
98        Ok(vec![])
99    }
100
101    fn handle_established_inbound_connection(
102        &mut self,
103        _: ConnectionId,
104        _: PeerId,
105        _: &Multiaddr,
106        _: &Multiaddr,
107    ) -> Result<THandler<Self>, ConnectionDenied> {
108        Ok(DummyConnectionHandler)
109    }
110
111    fn handle_established_outbound_connection(
112        &mut self,
113        _: ConnectionId,
114        _: PeerId,
115        _: &Multiaddr,
116        _: Endpoint,
117        _: PortUse,
118    ) -> Result<THandler<Self>, ConnectionDenied> {
119        Ok(DummyConnectionHandler)
120    }
121
122    fn on_connection_handler_event(
123        &mut self,
124        _: libp2p::PeerId,
125        _: swarm::ConnectionId,
126        _: swarm::THandlerOutEvent<Self>,
127    ) {
128    }
129
130    fn on_swarm_event(&mut self, event: FromSwarm) {
131        match event {
132            FromSwarm::ConnectionEstablished(ConnectionEstablished {
133                peer_id,
134                connection_id,
135                endpoint,
136                ..
137            }) => {
138                let multiaddr = endpoint.get_remote_address().clone();
139                self.peer_connections
140                    .entry(peer_id)
141                    .or_default()
142                    .push((connection_id, multiaddr));
143            }
144            FromSwarm::ConnectionClosed(ConnectionClosed {
145                peer_id,
146                connection_id,
147                remaining_established,
148                ..
149            }) => {
150                if let Entry::Occupied(mut entry) = self.peer_connections.entry(peer_id) {
151                    let list = entry.get_mut();
152
153                    list.retain(|(id, _)| *id != connection_id);
154
155                    if list.is_empty() {
156                        entry.remove();
157                    }
158                }
159
160                if remaining_established == 0 {
161                    self.peer_rtt.remove(&(peer_id));
162                    self.peer_info.remove(&peer_id);
163                }
164            }
165
166            _ => {}
167        }
168    }
169
170    fn poll(&mut self, _: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
171        if let Some(event) = self.events.pop_front() {
172            return Poll::Ready(event);
173        }
174
175        Poll::Pending
176    }
177}