running_process/broker/server/
fd_pressure.rs1use std::io;
16use std::sync::Mutex;
17
18use crate::broker::protocol::{ErrorCode, HelloReply};
19
20use super::connection::refused_reply;
21
22pub const DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS: u32 = 3;
24pub const DEFAULT_FD_PRESSURE_RETRY_AFTER_MS: u64 = 1_000;
26
27#[derive(Clone, Copy, Debug, PartialEq, Eq)]
29pub enum FdPressureDecision {
30 Demoted,
32 Unrelated,
34}
35
36#[derive(Clone, Copy, Debug)]
38pub struct FdPressureConfig {
39 pub recovery_accepts: u32,
41 pub retry_after_ms: u64,
43}
44
45impl Default for FdPressureConfig {
46 fn default() -> Self {
47 Self {
48 recovery_accepts: DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS,
49 retry_after_ms: DEFAULT_FD_PRESSURE_RETRY_AFTER_MS,
50 }
51 }
52}
53
54#[derive(Clone, Copy, Debug, Default)]
55struct GuardState {
56 demoted: bool,
57 consecutive_ok: u32,
58 demotions_total: u64,
59 refused_while_demoted: u64,
60}
61
62#[derive(Debug, Default)]
64pub struct FdPressureGuard {
65 config: FdPressureConfig,
66 state: Mutex<GuardState>,
67}
68
69impl FdPressureGuard {
70 pub fn new(config: FdPressureConfig) -> Self {
72 Self {
73 config,
74 state: Mutex::new(GuardState::default()),
75 }
76 }
77
78 pub fn on_accept_error(&self, err: &io::Error) -> FdPressureDecision {
81 if !is_fd_exhaustion_error(err) {
82 return FdPressureDecision::Unrelated;
83 }
84 let mut state = self.lock();
85 if !state.demoted {
86 state.demoted = true;
87 state.demotions_total += 1;
88 }
89 state.consecutive_ok = 0;
90 FdPressureDecision::Demoted
91 }
92
93 pub fn on_accept_ok(&self) -> bool {
96 let mut state = self.lock();
97 if !state.demoted {
98 return false;
99 }
100 state.consecutive_ok += 1;
101 if state.consecutive_ok >= self.config.recovery_accepts {
102 state.demoted = false;
103 state.consecutive_ok = 0;
104 return true;
105 }
106 false
107 }
108
109 pub fn is_demoted(&self) -> bool {
111 self.lock().demoted
112 }
113
114 pub fn demotions_total(&self) -> u64 {
116 self.lock().demotions_total
117 }
118
119 pub fn refused_while_demoted(&self) -> u64 {
121 self.lock().refused_while_demoted
122 }
123
124 pub fn refusal_reply(&self) -> HelloReply {
126 self.lock().refused_while_demoted += 1;
127 refused_reply(
128 ErrorCode::ErrorFdPressure,
129 "broker is low on file descriptors; retry shortly",
130 self.config.retry_after_ms,
131 )
132 }
133
134 pub fn force_demote(&self) {
136 let mut state = self.lock();
137 if !state.demoted {
138 state.demoted = true;
139 state.demotions_total += 1;
140 }
141 state.consecutive_ok = 0;
142 }
143
144 fn lock(&self) -> std::sync::MutexGuard<'_, GuardState> {
145 self.state
146 .lock()
147 .unwrap_or_else(|poisoned| poisoned.into_inner())
148 }
149}
150
151pub fn is_fd_exhaustion_error(err: &io::Error) -> bool {
153 let Some(code) = err.raw_os_error() else {
154 return false;
155 };
156 #[cfg(unix)]
157 {
158 code == libc::EMFILE || code == libc::ENFILE
159 }
160 #[cfg(windows)]
161 {
162 code == 10024 || code == 4 || code == 1450
164 }
165}
166
167pub fn fd_exhaustion_error_for_tests() -> io::Error {
169 #[cfg(unix)]
170 {
171 io::Error::from_raw_os_error(libc::EMFILE)
172 }
173 #[cfg(windows)]
174 {
175 io::Error::from_raw_os_error(10024)
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use crate::broker::protocol::hello_reply::Result as HelloReplyResult;
183
184 #[test]
185 fn unrelated_errors_do_not_demote() {
186 let guard = FdPressureGuard::default();
187 let err = io::Error::new(io::ErrorKind::PermissionDenied, "denied");
188 assert_eq!(guard.on_accept_error(&err), FdPressureDecision::Unrelated);
189 assert!(!guard.is_demoted());
190 assert_eq!(guard.demotions_total(), 0);
191 }
192
193 #[test]
194 fn fd_exhaustion_demotes_and_recovers_after_streak() {
195 let guard = FdPressureGuard::new(FdPressureConfig {
196 recovery_accepts: 2,
197 retry_after_ms: 250,
198 });
199 assert_eq!(
200 guard.on_accept_error(&fd_exhaustion_error_for_tests()),
201 FdPressureDecision::Demoted
202 );
203 assert!(guard.is_demoted());
204 assert_eq!(guard.demotions_total(), 1);
205
206 assert!(!guard.on_accept_ok());
207 assert!(guard.is_demoted());
208 assert!(guard.on_accept_ok());
209 assert!(!guard.is_demoted());
210 }
211
212 #[test]
213 fn accept_error_resets_recovery_streak() {
214 let guard = FdPressureGuard::new(FdPressureConfig {
215 recovery_accepts: 2,
216 retry_after_ms: 250,
217 });
218 guard.on_accept_error(&fd_exhaustion_error_for_tests());
219 assert!(!guard.on_accept_ok());
220 guard.on_accept_error(&fd_exhaustion_error_for_tests());
221 assert!(!guard.on_accept_ok());
222 assert!(guard.is_demoted());
223 assert!(guard.on_accept_ok());
224 assert!(!guard.is_demoted());
225 assert_eq!(guard.demotions_total(), 1);
226 }
227
228 #[test]
229 fn refusal_reply_uses_reserved_fd_pressure_code() {
230 let guard = FdPressureGuard::default();
231 guard.force_demote();
232 let reply = guard.refusal_reply();
233 let HelloReplyResult::Refused(refused) = reply.result.unwrap() else {
234 panic!("expected refusal");
235 };
236 assert_eq!(
237 ErrorCode::try_from(refused.code),
238 Ok(ErrorCode::ErrorFdPressure)
239 );
240 assert_eq!(refused.retry_after_ms, DEFAULT_FD_PRESSURE_RETRY_AFTER_MS);
241 assert_eq!(guard.refused_while_demoted(), 1);
242 }
243
244 #[test]
245 fn ok_accepts_while_healthy_are_no_ops() {
246 let guard = FdPressureGuard::default();
247 assert!(!guard.on_accept_ok());
248 assert!(!guard.is_demoted());
249 }
250}