1use std::{
5 sync::Arc,
6 task::Poll,
7 time::{Duration, Instant},
8};
9
10use ahash::HashMap;
11use libp2p::{
12 PeerId,
13 request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
14 swarm::{CloseConnection, NetworkBehaviour, THandlerOutEvent, derive_prelude::*},
15};
16use tracing::warn;
17
18use super::*;
19use crate::libp2p::{PeerManager, service::metrics};
20
21type InnerBehaviour = request_response::Behaviour<HelloCodec>;
22
23pub struct HelloBehaviour {
24 inner: InnerBehaviour,
25 response_channels: HashMap<OutboundRequestId, flume::Sender<HelloResponse>>,
26 pending_inbound_hello_peers: HashMap<PeerId, Instant>,
27 peer_manager: Arc<PeerManager>,
28}
29
30impl HelloBehaviour {
31 pub fn new(cfg: request_response::Config, peer_manager: Arc<PeerManager>) -> Self {
32 Self {
33 inner: InnerBehaviour::new([(HELLO_PROTOCOL_NAME, ProtocolSupport::Full)], cfg),
34 response_channels: Default::default(),
35 pending_inbound_hello_peers: Default::default(),
36 peer_manager,
37 }
38 }
39
40 pub fn send_request(
41 &mut self,
42 peer: &PeerId,
43 request: HelloRequest,
44 response_channel: flume::Sender<HelloResponse>,
45 ) -> OutboundRequestId {
46 let request_id = self.inner.send_request(peer, request);
47 self.response_channels.insert(request_id, response_channel);
48 self.track_metrics();
49 request_id
50 }
51
52 pub fn send_response(
53 &mut self,
54 channel: ResponseChannel<HelloResponse>,
55 response: HelloResponse,
56 ) -> Result<(), HelloResponse> {
57 self.inner.send_response(channel, response)
58 }
59
60 pub async fn handle_response(
61 &mut self,
62 request_id: &OutboundRequestId,
63 response: HelloResponse,
64 ) {
65 if let Some(channel) = self.response_channels.remove(request_id) {
66 self.track_metrics();
67 if let Err(err) = channel.send_async(response).await {
68 warn!("{err}");
69 }
70 }
71 }
72
73 pub fn on_outbound_failure(&mut self, request_id: &OutboundRequestId) {
74 if self.response_channels.remove(request_id).is_some() {
75 self.track_metrics();
76 }
77 }
78
79 fn track_metrics(&self) {
80 metrics::NETWORK_CONTAINER_CAPACITIES
81 .get_or_create(&metrics::values::HELLO_REQUEST_TABLE)
82 .set(self.response_channels.capacity() as _);
83 }
84}
85
86impl NetworkBehaviour for HelloBehaviour {
87 type ConnectionHandler = <InnerBehaviour as NetworkBehaviour>::ConnectionHandler;
88
89 type ToSwarm = <InnerBehaviour as NetworkBehaviour>::ToSwarm;
90
91 fn handle_established_inbound_connection(
92 &mut self,
93 connection_id: ConnectionId,
94 peer: PeerId,
95 local_addr: &libp2p::Multiaddr,
96 remote_addr: &libp2p::Multiaddr,
97 ) -> Result<THandler<Self>, ConnectionDenied> {
98 self.inner.handle_established_inbound_connection(
99 connection_id,
100 peer,
101 local_addr,
102 remote_addr,
103 )
104 }
105
106 fn handle_established_outbound_connection(
107 &mut self,
108 connection_id: ConnectionId,
109 peer: PeerId,
110 addr: &Multiaddr,
111 role_override: libp2p::core::Endpoint,
112 port_use: PortUse,
113 ) -> Result<THandler<Self>, ConnectionDenied> {
114 self.inner.handle_established_outbound_connection(
115 connection_id,
116 peer,
117 addr,
118 role_override,
119 port_use,
120 )
121 }
122
123 fn handle_pending_inbound_connection(
124 &mut self,
125 connection_id: ConnectionId,
126 local_addr: &libp2p::Multiaddr,
127 remote_addr: &libp2p::Multiaddr,
128 ) -> Result<(), ConnectionDenied> {
129 self.inner
130 .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
131 }
132
133 fn handle_pending_outbound_connection(
134 &mut self,
135 connection_id: ConnectionId,
136 maybe_peer: Option<PeerId>,
137 addresses: &[libp2p::Multiaddr],
138 effective_role: libp2p::core::Endpoint,
139 ) -> Result<Vec<libp2p::Multiaddr>, ConnectionDenied> {
140 self.inner.handle_pending_outbound_connection(
141 connection_id,
142 maybe_peer,
143 addresses,
144 effective_role,
145 )
146 }
147
148 fn on_connection_handler_event(
149 &mut self,
150 peer_id: PeerId,
151 connection_id: ConnectionId,
152 event: THandlerOutEvent<Self>,
153 ) {
154 self.inner
155 .on_connection_handler_event(peer_id, connection_id, event)
156 }
157
158 fn on_swarm_event(&mut self, event: FromSwarm) {
159 if let FromSwarm::ConnectionEstablished(e) = &event
160 && e.other_established == 0
161 {
162 self.pending_inbound_hello_peers
163 .insert(e.peer_id, Instant::now());
164 }
165
166 self.inner.on_swarm_event(event)
167 }
168
169 fn poll(
170 &mut self,
171 cx: &mut std::task::Context<'_>,
172 ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
173 if let Poll::Ready(ev) = self.inner.poll(cx) {
174 if let ToSwarm::GenerateEvent(request_response::Event::Message {
176 peer,
177 message:
178 request_response::Message::Request {
179 request: HelloRequest { .. },
180 ..
181 },
182 ..
183 }) = &ev
184 {
185 self.pending_inbound_hello_peers.remove(peer);
186 }
187
188 return Poll::Ready(ev);
189 }
190
191 const INBOUND_HELLO_WAIT_TIMEOUT: Duration = Duration::from_secs(30);
193 let now = Instant::now();
194 if let Some((&peer_to_disconnect, _)) =
195 self.pending_inbound_hello_peers
196 .iter()
197 .find(|&(_, &connected_instant)| {
198 now.duration_since(connected_instant) > INBOUND_HELLO_WAIT_TIMEOUT
199 })
200 {
201 self.pending_inbound_hello_peers.remove(&peer_to_disconnect);
202 if !self.peer_manager.is_peer_protected(&peer_to_disconnect) {
203 tracing::debug!(peer=%peer_to_disconnect, "Disconnecting peer for not receiving hello in 30s");
204 return Poll::Ready(ToSwarm::CloseConnection {
205 peer_id: peer_to_disconnect,
206 connection: CloseConnection::All,
207 });
208 }
209 }
210
211 Poll::Pending
212 }
213}