Skip to main content

running_process/broker/server/
fd_pressure.rs

1//! FD-pressure self-demotion for the broker accept path (#390).
2//!
3//! When `accept()` fails with EMFILE/ENFILE the broker is out of file
4//! descriptors. Instead of failing accepts opaquely, the broker demotes
5//! itself: connections that do get through receive a structured
6//! `Refused` reply carrying the reserved `ERROR_FD_PRESSURE` code (slot 9
7//! in the frozen v1 envelope), and admin verbs keep working so operators
8//! can see the demoted state in `status --json`. The guard recovers
9//! automatically once a configurable streak of accepts succeeds again.
10//!
11//! The guard is a small pure state machine behind interior mutability so
12//! the accept loop, the admin snapshot provider, and tests can all share
13//! one instance without real fd exhaustion.
14
15use std::io;
16use std::sync::Mutex;
17
18use crate::broker::protocol::{ErrorCode, HelloReply};
19
20use super::connection::refused_reply;
21
22/// Consecutive successful accepts required to clear a demotion.
23pub const DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS: u32 = 3;
24/// `Refused.retry_after_ms` hint sent while demoted.
25pub const DEFAULT_FD_PRESSURE_RETRY_AFTER_MS: u64 = 1_000;
26
27/// Outcome of feeding one accept error into the guard.
28#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum FdPressureDecision {
30    /// The error was fd exhaustion; the broker is now (still) demoted.
31    Demoted,
32    /// The error was unrelated to fd pressure; caller handles it normally.
33    Unrelated,
34}
35
36/// Tunables for [`FdPressureGuard`].
37#[derive(Clone, Copy, Debug)]
38pub struct FdPressureConfig {
39    /// Consecutive successful accepts required to clear a demotion.
40    pub recovery_accepts: u32,
41    /// `Refused.retry_after_ms` hint sent while demoted.
42    pub retry_after_ms: u64,
43}
44
45impl Default for FdPressureConfig {
46    fn default() -> Self {
47        Self {
48            recovery_accepts: DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS,
49            retry_after_ms: DEFAULT_FD_PRESSURE_RETRY_AFTER_MS,
50        }
51    }
52}
53
54#[derive(Clone, Copy, Debug, Default)]
55struct GuardState {
56    demoted: bool,
57    consecutive_ok: u32,
58    demotions_total: u64,
59    refused_while_demoted: u64,
60}
61
62/// Shared fd-pressure demotion state machine.
63#[derive(Debug, Default)]
64pub struct FdPressureGuard {
65    config: FdPressureConfig,
66    state: Mutex<GuardState>,
67}
68
69impl FdPressureGuard {
70    /// Build a guard with explicit tunables.
71    pub fn new(config: FdPressureConfig) -> Self {
72        Self {
73            config,
74            state: Mutex::new(GuardState::default()),
75        }
76    }
77
78    /// Classify one accept error. Fd-exhaustion errors demote the broker;
79    /// anything else is reported back to the caller as unrelated.
80    pub fn on_accept_error(&self, err: &io::Error) -> FdPressureDecision {
81        if !is_fd_exhaustion_error(err) {
82            return FdPressureDecision::Unrelated;
83        }
84        let mut state = self.lock();
85        if !state.demoted {
86            state.demoted = true;
87            state.demotions_total += 1;
88        }
89        state.consecutive_ok = 0;
90        FdPressureDecision::Demoted
91    }
92
93    /// Record one successful accept. Returns `true` when this accept
94    /// cleared an active demotion.
95    pub fn on_accept_ok(&self) -> bool {
96        let mut state = self.lock();
97        if !state.demoted {
98            return false;
99        }
100        state.consecutive_ok += 1;
101        if state.consecutive_ok >= self.config.recovery_accepts {
102            state.demoted = false;
103            state.consecutive_ok = 0;
104            return true;
105        }
106        false
107    }
108
109    /// Whether new Hello connections are currently being refused.
110    pub fn is_demoted(&self) -> bool {
111        self.lock().demoted
112    }
113
114    /// Total demotion episodes since the guard was created.
115    pub fn demotions_total(&self) -> u64 {
116        self.lock().demotions_total
117    }
118
119    /// Hello connections refused with `ERROR_FD_PRESSURE` so far.
120    pub fn refused_while_demoted(&self) -> u64 {
121        self.lock().refused_while_demoted
122    }
123
124    /// Structured refusal sent to Hello clients while demoted.
125    pub fn refusal_reply(&self) -> HelloReply {
126        self.lock().refused_while_demoted += 1;
127        refused_reply(
128            ErrorCode::ErrorFdPressure,
129            "broker is low on file descriptors; retry shortly",
130            self.config.retry_after_ms,
131        )
132    }
133
134    /// Force a demotion without an accept error (tests / external probes).
135    pub fn force_demote(&self) {
136        let mut state = self.lock();
137        if !state.demoted {
138            state.demoted = true;
139            state.demotions_total += 1;
140        }
141        state.consecutive_ok = 0;
142    }
143
144    fn lock(&self) -> std::sync::MutexGuard<'_, GuardState> {
145        self.state
146            .lock()
147            .unwrap_or_else(|poisoned| poisoned.into_inner())
148    }
149}
150
151/// True when `err` signals process- or system-wide fd exhaustion.
152pub fn is_fd_exhaustion_error(err: &io::Error) -> bool {
153    let Some(code) = err.raw_os_error() else {
154        return false;
155    };
156    #[cfg(unix)]
157    {
158        code == libc::EMFILE || code == libc::ENFILE
159    }
160    #[cfg(windows)]
161    {
162        // WSAEMFILE, ERROR_TOO_MANY_OPEN_FILES, ERROR_NO_SYSTEM_RESOURCES.
163        code == 10024 || code == 4 || code == 1450
164    }
165}
166
167/// One platform-appropriate fd-exhaustion raw error code (test helper).
168pub fn fd_exhaustion_error_for_tests() -> io::Error {
169    #[cfg(unix)]
170    {
171        io::Error::from_raw_os_error(libc::EMFILE)
172    }
173    #[cfg(windows)]
174    {
175        io::Error::from_raw_os_error(10024)
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use crate::broker::protocol::hello_reply::Result as HelloReplyResult;
183
184    #[test]
185    fn unrelated_errors_do_not_demote() {
186        let guard = FdPressureGuard::default();
187        let err = io::Error::new(io::ErrorKind::PermissionDenied, "denied");
188        assert_eq!(guard.on_accept_error(&err), FdPressureDecision::Unrelated);
189        assert!(!guard.is_demoted());
190        assert_eq!(guard.demotions_total(), 0);
191    }
192
193    #[test]
194    fn fd_exhaustion_demotes_and_recovers_after_streak() {
195        let guard = FdPressureGuard::new(FdPressureConfig {
196            recovery_accepts: 2,
197            retry_after_ms: 250,
198        });
199        assert_eq!(
200            guard.on_accept_error(&fd_exhaustion_error_for_tests()),
201            FdPressureDecision::Demoted
202        );
203        assert!(guard.is_demoted());
204        assert_eq!(guard.demotions_total(), 1);
205
206        assert!(!guard.on_accept_ok());
207        assert!(guard.is_demoted());
208        assert!(guard.on_accept_ok());
209        assert!(!guard.is_demoted());
210    }
211
212    #[test]
213    fn accept_error_resets_recovery_streak() {
214        let guard = FdPressureGuard::new(FdPressureConfig {
215            recovery_accepts: 2,
216            retry_after_ms: 250,
217        });
218        guard.on_accept_error(&fd_exhaustion_error_for_tests());
219        assert!(!guard.on_accept_ok());
220        guard.on_accept_error(&fd_exhaustion_error_for_tests());
221        assert!(!guard.on_accept_ok());
222        assert!(guard.is_demoted());
223        assert!(guard.on_accept_ok());
224        assert!(!guard.is_demoted());
225        assert_eq!(guard.demotions_total(), 1);
226    }
227
228    #[test]
229    fn refusal_reply_uses_reserved_fd_pressure_code() {
230        let guard = FdPressureGuard::default();
231        guard.force_demote();
232        let reply = guard.refusal_reply();
233        let HelloReplyResult::Refused(refused) = reply.result.unwrap() else {
234            panic!("expected refusal");
235        };
236        assert_eq!(
237            ErrorCode::try_from(refused.code),
238            Ok(ErrorCode::ErrorFdPressure)
239        );
240        assert_eq!(refused.retry_after_ms, DEFAULT_FD_PRESSURE_RETRY_AFTER_MS);
241        assert_eq!(guard.refused_while_demoted(), 1);
242    }
243
244    #[test]
245    fn ok_accepts_while_healthy_are_no_ops() {
246        let guard = FdPressureGuard::default();
247        assert!(!guard.on_accept_ok());
248        assert!(!guard.is_demoted());
249    }
250}