ant_libp2p_memory_connection_limits/
lib.rs

1// Copyright 2023 Protocol Labs.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use ant_libp2p_core as libp2p_core;
22use ant_libp2p_swarm as libp2p_swarm;
23
24use std::{
25    convert::Infallible,
26    fmt,
27    task::{Context, Poll},
28    time::{Duration, Instant},
29};
30
31use libp2p_core::{transport::PortUse, Endpoint, Multiaddr};
32use libp2p_identity::PeerId;
33use libp2p_swarm::{
34    dummy, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent,
35    THandlerOutEvent, ToSwarm,
36};
37use sysinfo::MemoryRefreshKind;
38
39/// A [`NetworkBehaviour`] that enforces a set of memory usage based limits.
40///
41/// For these limits to take effect, this needs to be composed
42/// into the behaviour tree of your application.
43///
44/// If a connection is denied due to a limit, either a
45/// [`SwarmEvent::IncomingConnectionError`](libp2p_swarm::SwarmEvent::IncomingConnectionError)
46/// or [`SwarmEvent::OutgoingConnectionError`](libp2p_swarm::SwarmEvent::OutgoingConnectionError)
47/// will be emitted. The [`ListenError::Denied`](libp2p_swarm::ListenError::Denied) and respectively
48/// the [`DialError::Denied`](libp2p_swarm::DialError::Denied) variant
49/// contain a [`ConnectionDenied`] type that can be downcast to [`MemoryUsageLimitExceeded`] error
50/// if (and only if) **this** behaviour denied the connection.
51///
52/// If you employ multiple [`NetworkBehaviour`]s that manage connections,
53/// it may also be a different error.
54///
55/// [Behaviour::with_max_bytes] and [Behaviour::with_max_percentage] are mutually exclusive.
56/// If you need to employ both of them,
57/// compose two instances of [Behaviour] into your custom behaviour.
58///
59/// # Example
60///
61/// ```rust
62/// # use libp2p_identify as identify;
63/// # use libp2p_swarm_derive::NetworkBehaviour;
64/// # use libp2p_memory_connection_limits as memory_connection_limits;
65///
66/// #[derive(NetworkBehaviour)]
67/// # #[behaviour(prelude = "libp2p_swarm::derive_prelude")]
68/// struct MyBehaviour {
69///     identify: identify::Behaviour,
70///     limits: memory_connection_limits::Behaviour,
71/// }
72/// ```
73pub struct Behaviour {
74    max_allowed_bytes: usize,
75    process_physical_memory_bytes: usize,
76    last_refreshed: Instant,
77}
78
79/// The maximum duration for which the retrieved memory-stats
80/// of the process are allowed to be stale.
81///
82/// Once exceeded, we will retrieve new stats.
83const MAX_STALE_DURATION: Duration = Duration::from_millis(100);
84
85impl Behaviour {
86    /// Sets the process memory usage threshold in absolute bytes.
87    ///
88    /// New inbound and outbound connections will be denied when the threshold is reached.
89    pub fn with_max_bytes(max_allowed_bytes: usize) -> Self {
90        Self {
91            max_allowed_bytes,
92            process_physical_memory_bytes: memory_stats::memory_stats()
93                .map(|s| s.physical_mem)
94                .unwrap_or_default(),
95            last_refreshed: Instant::now(),
96        }
97    }
98
99    /// Sets the process memory usage threshold in the percentage of the total physical memory.
100    ///
101    /// New inbound and outbound connections will be denied when the threshold is reached.
102    pub fn with_max_percentage(percentage: f64) -> Self {
103        use sysinfo::{RefreshKind, System};
104
105        let system_memory_bytes = System::new_with_specifics(
106            RefreshKind::new().with_memory(MemoryRefreshKind::new().with_ram()),
107        )
108        .total_memory();
109
110        Self::with_max_bytes((system_memory_bytes as f64 * percentage).round() as usize)
111    }
112
113    /// Gets the process memory usage threshold in bytes.
114    pub fn max_allowed_bytes(&self) -> usize {
115        self.max_allowed_bytes
116    }
117
118    fn check_limit(&mut self) -> Result<(), ConnectionDenied> {
119        self.refresh_memory_stats_if_needed();
120
121        if self.process_physical_memory_bytes > self.max_allowed_bytes {
122            return Err(ConnectionDenied::new(MemoryUsageLimitExceeded {
123                process_physical_memory_bytes: self.process_physical_memory_bytes,
124                max_allowed_bytes: self.max_allowed_bytes,
125            }));
126        }
127
128        Ok(())
129    }
130
131    fn refresh_memory_stats_if_needed(&mut self) {
132        let now = Instant::now();
133
134        if self.last_refreshed + MAX_STALE_DURATION > now {
135            // Memory stats are reasonably recent, don't refresh.
136            return;
137        }
138
139        let Some(stats) = memory_stats::memory_stats() else {
140            tracing::warn!("Failed to retrieve process memory stats");
141            return;
142        };
143
144        self.last_refreshed = now;
145        self.process_physical_memory_bytes = stats.physical_mem;
146    }
147}
148
149impl NetworkBehaviour for Behaviour {
150    type ConnectionHandler = dummy::ConnectionHandler;
151    type ToSwarm = Infallible;
152
153    fn handle_pending_inbound_connection(
154        &mut self,
155        _: ConnectionId,
156        _: &Multiaddr,
157        _: &Multiaddr,
158    ) -> Result<(), ConnectionDenied> {
159        self.check_limit()
160    }
161
162    fn handle_established_inbound_connection(
163        &mut self,
164        _: ConnectionId,
165        _: PeerId,
166        _: &Multiaddr,
167        _: &Multiaddr,
168    ) -> Result<THandler<Self>, ConnectionDenied> {
169        Ok(dummy::ConnectionHandler)
170    }
171
172    fn handle_pending_outbound_connection(
173        &mut self,
174        _: ConnectionId,
175        _: Option<PeerId>,
176        _: &[Multiaddr],
177        _: Endpoint,
178    ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
179        self.check_limit()?;
180        Ok(vec![])
181    }
182
183    fn handle_established_outbound_connection(
184        &mut self,
185        _: ConnectionId,
186        _: PeerId,
187        _: &Multiaddr,
188        _: Endpoint,
189        _: PortUse,
190    ) -> Result<THandler<Self>, ConnectionDenied> {
191        Ok(dummy::ConnectionHandler)
192    }
193
194    fn on_swarm_event(&mut self, _: FromSwarm) {}
195
196    fn on_connection_handler_event(
197        &mut self,
198        _id: PeerId,
199        _: ConnectionId,
200        event: THandlerOutEvent<Self>,
201    ) {
202        // TODO: remove when Rust 1.82 is MSRV
203        #[allow(unreachable_patterns)]
204        libp2p_core::util::unreachable(event)
205    }
206
207    fn poll(&mut self, _: &mut Context<'_>) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
208        Poll::Pending
209    }
210}
211
212/// A connection limit has been exceeded.
213#[derive(Debug, Clone, Copy)]
214pub struct MemoryUsageLimitExceeded {
215    process_physical_memory_bytes: usize,
216    max_allowed_bytes: usize,
217}
218
219impl MemoryUsageLimitExceeded {
220    pub fn process_physical_memory_bytes(&self) -> usize {
221        self.process_physical_memory_bytes
222    }
223
224    pub fn max_allowed_bytes(&self) -> usize {
225        self.max_allowed_bytes
226    }
227}
228
229impl std::error::Error for MemoryUsageLimitExceeded {}
230
231impl fmt::Display for MemoryUsageLimitExceeded {
232    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
233        write!(
234            f,
235            "process physical memory usage limit exceeded: process memory: {} bytes, max allowed: {} bytes",
236            self.process_physical_memory_bytes,
237            self.max_allowed_bytes,
238        )
239    }
240}