use std::io;
use std::sync::Mutex;
use crate::broker::protocol::{ErrorCode, HelloReply};
use super::connection::refused_reply;
pub const DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS: u32 = 3;
pub const DEFAULT_FD_PRESSURE_RETRY_AFTER_MS: u64 = 1_000;
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum FdPressureDecision {
Demoted,
Unrelated,
}
#[derive(Clone, Copy, Debug)]
pub struct FdPressureConfig {
pub recovery_accepts: u32,
pub retry_after_ms: u64,
}
impl Default for FdPressureConfig {
fn default() -> Self {
Self {
recovery_accepts: DEFAULT_FD_PRESSURE_RECOVERY_ACCEPTS,
retry_after_ms: DEFAULT_FD_PRESSURE_RETRY_AFTER_MS,
}
}
}
#[derive(Clone, Copy, Debug, Default)]
struct GuardState {
demoted: bool,
consecutive_ok: u32,
demotions_total: u64,
refused_while_demoted: u64,
}
#[derive(Debug, Default)]
pub struct FdPressureGuard {
config: FdPressureConfig,
state: Mutex<GuardState>,
}
impl FdPressureGuard {
pub fn new(config: FdPressureConfig) -> Self {
Self {
config,
state: Mutex::new(GuardState::default()),
}
}
pub fn on_accept_error(&self, err: &io::Error) -> FdPressureDecision {
if !is_fd_exhaustion_error(err) {
return FdPressureDecision::Unrelated;
}
let mut state = self.lock();
if !state.demoted {
state.demoted = true;
state.demotions_total += 1;
}
state.consecutive_ok = 0;
FdPressureDecision::Demoted
}
pub fn on_accept_ok(&self) -> bool {
let mut state = self.lock();
if !state.demoted {
return false;
}
state.consecutive_ok += 1;
if state.consecutive_ok >= self.config.recovery_accepts {
state.demoted = false;
state.consecutive_ok = 0;
return true;
}
false
}
pub fn is_demoted(&self) -> bool {
self.lock().demoted
}
pub fn demotions_total(&self) -> u64 {
self.lock().demotions_total
}
pub fn refused_while_demoted(&self) -> u64 {
self.lock().refused_while_demoted
}
pub fn refusal_reply(&self) -> HelloReply {
self.lock().refused_while_demoted += 1;
refused_reply(
ErrorCode::ErrorFdPressure,
"broker is low on file descriptors; retry shortly",
self.config.retry_after_ms,
)
}
pub fn force_demote(&self) {
let mut state = self.lock();
if !state.demoted {
state.demoted = true;
state.demotions_total += 1;
}
state.consecutive_ok = 0;
}
fn lock(&self) -> std::sync::MutexGuard<'_, GuardState> {
self.state
.lock()
.unwrap_or_else(|poisoned| poisoned.into_inner())
}
}
pub fn is_fd_exhaustion_error(err: &io::Error) -> bool {
let Some(code) = err.raw_os_error() else {
return false;
};
#[cfg(unix)]
{
code == libc::EMFILE || code == libc::ENFILE
}
#[cfg(windows)]
{
code == 10024 || code == 4 || code == 1450
}
}
pub fn fd_exhaustion_error_for_tests() -> io::Error {
#[cfg(unix)]
{
io::Error::from_raw_os_error(libc::EMFILE)
}
#[cfg(windows)]
{
io::Error::from_raw_os_error(10024)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::broker::protocol::hello_reply::Result as HelloReplyResult;
#[test]
fn unrelated_errors_do_not_demote() {
let guard = FdPressureGuard::default();
let err = io::Error::new(io::ErrorKind::PermissionDenied, "denied");
assert_eq!(guard.on_accept_error(&err), FdPressureDecision::Unrelated);
assert!(!guard.is_demoted());
assert_eq!(guard.demotions_total(), 0);
}
#[test]
fn fd_exhaustion_demotes_and_recovers_after_streak() {
let guard = FdPressureGuard::new(FdPressureConfig {
recovery_accepts: 2,
retry_after_ms: 250,
});
assert_eq!(
guard.on_accept_error(&fd_exhaustion_error_for_tests()),
FdPressureDecision::Demoted
);
assert!(guard.is_demoted());
assert_eq!(guard.demotions_total(), 1);
assert!(!guard.on_accept_ok());
assert!(guard.is_demoted());
assert!(guard.on_accept_ok());
assert!(!guard.is_demoted());
}
#[test]
fn accept_error_resets_recovery_streak() {
let guard = FdPressureGuard::new(FdPressureConfig {
recovery_accepts: 2,
retry_after_ms: 250,
});
guard.on_accept_error(&fd_exhaustion_error_for_tests());
assert!(!guard.on_accept_ok());
guard.on_accept_error(&fd_exhaustion_error_for_tests());
assert!(!guard.on_accept_ok());
assert!(guard.is_demoted());
assert!(guard.on_accept_ok());
assert!(!guard.is_demoted());
assert_eq!(guard.demotions_total(), 1);
}
#[test]
fn refusal_reply_uses_reserved_fd_pressure_code() {
let guard = FdPressureGuard::default();
guard.force_demote();
let reply = guard.refusal_reply();
let HelloReplyResult::Refused(refused) = reply.result.unwrap() else {
panic!("expected refusal");
};
assert_eq!(
ErrorCode::try_from(refused.code),
Ok(ErrorCode::ErrorFdPressure)
);
assert_eq!(refused.retry_after_ms, DEFAULT_FD_PRESSURE_RETRY_AFTER_MS);
assert_eq!(guard.refused_while_demoted(), 1);
}
#[test]
fn ok_accepts_while_healthy_are_no_ops() {
let guard = FdPressureGuard::default();
assert!(!guard.on_accept_ok());
assert!(!guard.is_demoted());
}
}