forest/libp2p/hello/
behaviour.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use std::{
5    sync::Arc,
6    task::Poll,
7    time::{Duration, Instant},
8};
9
10use ahash::HashMap;
11use libp2p::{
12    PeerId,
13    request_response::{self, OutboundRequestId, ProtocolSupport, ResponseChannel},
14    swarm::{CloseConnection, NetworkBehaviour, THandlerOutEvent, derive_prelude::*},
15};
16use tracing::warn;
17
18use super::*;
19use crate::libp2p::{PeerManager, service::metrics};
20
21type InnerBehaviour = request_response::Behaviour<HelloCodec>;
22
23pub struct HelloBehaviour {
24    inner: InnerBehaviour,
25    response_channels: HashMap<OutboundRequestId, flume::Sender<HelloResponse>>,
26    pending_inbound_hello_peers: HashMap<PeerId, Instant>,
27    peer_manager: Arc<PeerManager>,
28}
29
30impl HelloBehaviour {
31    pub fn new(cfg: request_response::Config, peer_manager: Arc<PeerManager>) -> Self {
32        Self {
33            inner: InnerBehaviour::new([(HELLO_PROTOCOL_NAME, ProtocolSupport::Full)], cfg),
34            response_channels: Default::default(),
35            pending_inbound_hello_peers: Default::default(),
36            peer_manager,
37        }
38    }
39
40    pub fn send_request(
41        &mut self,
42        peer: &PeerId,
43        request: HelloRequest,
44        response_channel: flume::Sender<HelloResponse>,
45    ) -> OutboundRequestId {
46        let request_id = self.inner.send_request(peer, request);
47        self.response_channels.insert(request_id, response_channel);
48        self.track_metrics();
49        request_id
50    }
51
52    pub fn send_response(
53        &mut self,
54        channel: ResponseChannel<HelloResponse>,
55        response: HelloResponse,
56    ) -> Result<(), HelloResponse> {
57        self.inner.send_response(channel, response)
58    }
59
60    pub async fn handle_response(
61        &mut self,
62        request_id: &OutboundRequestId,
63        response: HelloResponse,
64    ) {
65        if let Some(channel) = self.response_channels.remove(request_id) {
66            self.track_metrics();
67            if let Err(err) = channel.send_async(response).await {
68                warn!("{err}");
69            }
70        }
71    }
72
73    pub fn on_outbound_failure(&mut self, request_id: &OutboundRequestId) {
74        if self.response_channels.remove(request_id).is_some() {
75            self.track_metrics();
76        }
77    }
78
79    fn track_metrics(&self) {
80        metrics::NETWORK_CONTAINER_CAPACITIES
81            .get_or_create(&metrics::values::HELLO_REQUEST_TABLE)
82            .set(self.response_channels.capacity() as _);
83    }
84}
85
86impl NetworkBehaviour for HelloBehaviour {
87    type ConnectionHandler = <InnerBehaviour as NetworkBehaviour>::ConnectionHandler;
88
89    type ToSwarm = <InnerBehaviour as NetworkBehaviour>::ToSwarm;
90
91    fn handle_established_inbound_connection(
92        &mut self,
93        connection_id: ConnectionId,
94        peer: PeerId,
95        local_addr: &libp2p::Multiaddr,
96        remote_addr: &libp2p::Multiaddr,
97    ) -> Result<THandler<Self>, ConnectionDenied> {
98        self.inner.handle_established_inbound_connection(
99            connection_id,
100            peer,
101            local_addr,
102            remote_addr,
103        )
104    }
105
106    fn handle_established_outbound_connection(
107        &mut self,
108        connection_id: ConnectionId,
109        peer: PeerId,
110        addr: &Multiaddr,
111        role_override: libp2p::core::Endpoint,
112        port_use: PortUse,
113    ) -> Result<THandler<Self>, ConnectionDenied> {
114        self.inner.handle_established_outbound_connection(
115            connection_id,
116            peer,
117            addr,
118            role_override,
119            port_use,
120        )
121    }
122
123    fn handle_pending_inbound_connection(
124        &mut self,
125        connection_id: ConnectionId,
126        local_addr: &libp2p::Multiaddr,
127        remote_addr: &libp2p::Multiaddr,
128    ) -> Result<(), ConnectionDenied> {
129        self.inner
130            .handle_pending_inbound_connection(connection_id, local_addr, remote_addr)
131    }
132
133    fn handle_pending_outbound_connection(
134        &mut self,
135        connection_id: ConnectionId,
136        maybe_peer: Option<PeerId>,
137        addresses: &[libp2p::Multiaddr],
138        effective_role: libp2p::core::Endpoint,
139    ) -> Result<Vec<libp2p::Multiaddr>, ConnectionDenied> {
140        self.inner.handle_pending_outbound_connection(
141            connection_id,
142            maybe_peer,
143            addresses,
144            effective_role,
145        )
146    }
147
148    fn on_connection_handler_event(
149        &mut self,
150        peer_id: PeerId,
151        connection_id: ConnectionId,
152        event: THandlerOutEvent<Self>,
153    ) {
154        self.inner
155            .on_connection_handler_event(peer_id, connection_id, event)
156    }
157
158    fn on_swarm_event(&mut self, event: FromSwarm) {
159        if let FromSwarm::ConnectionEstablished(e) = &event
160            && e.other_established == 0
161        {
162            self.pending_inbound_hello_peers
163                .insert(e.peer_id, Instant::now());
164        }
165
166        self.inner.on_swarm_event(event)
167    }
168
169    fn poll(
170        &mut self,
171        cx: &mut std::task::Context<'_>,
172    ) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
173        if let Poll::Ready(ev) = self.inner.poll(cx) {
174            // Remove a peer from `pending_inbound_hello_peers` when its hello request is received.
175            if let ToSwarm::GenerateEvent(request_response::Event::Message {
176                peer,
177                message:
178                    request_response::Message::Request {
179                        request: HelloRequest { .. },
180                        ..
181                    },
182                ..
183            }) = &ev
184            {
185                self.pending_inbound_hello_peers.remove(peer);
186            }
187
188            return Poll::Ready(ev);
189        }
190
191        // Disconnect peers whose hello request are not received after a TIMEOUT
192        const INBOUND_HELLO_WAIT_TIMEOUT: Duration = Duration::from_secs(30);
193        let now = Instant::now();
194        if let Some((&peer_to_disconnect, _)) =
195            self.pending_inbound_hello_peers
196                .iter()
197                .find(|&(_, &connected_instant)| {
198                    now.duration_since(connected_instant) > INBOUND_HELLO_WAIT_TIMEOUT
199                })
200        {
201            self.pending_inbound_hello_peers.remove(&peer_to_disconnect);
202            if !self.peer_manager.is_peer_protected(&peer_to_disconnect) {
203                tracing::debug!(peer=%peer_to_disconnect, "Disconnecting peer for not receiving hello in 30s");
204                return Poll::Ready(ToSwarm::CloseConnection {
205                    peer_id: peer_to_disconnect,
206                    connection: CloseConnection::All,
207                });
208            }
209        }
210
211        Poll::Pending
212    }
213}