use obzenflow_core::journal::JournalError;
use std::io;
#[derive(Clone, Copy, Debug)]
pub(crate) struct NofileLimit {
pub(crate) soft: u64,
pub(crate) hard: u64,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct DiskJournalFdEstimate {
pub(crate) stages: usize,
pub(crate) edges: usize,
pub(crate) metrics_enabled: bool,
pub(crate) estimated_fds: u64,
pub(crate) breakdown: DiskJournalFdBreakdown,
}
#[derive(Clone, Copy, Debug)]
pub(crate) struct DiskJournalFdBreakdown {
pub(crate) writer_fds: u64,
pub(crate) stage_reader_fds: u64,
pub(crate) metrics_reader_fds: u64,
pub(crate) system_reader_fds: u64,
pub(crate) overhead_fds: u64,
}
pub(crate) fn estimate_disk_journal_fds(
stages: usize,
edges: usize,
metrics_enabled: bool,
) -> DiskJournalFdEstimate {
let stage_writers = 2u64.saturating_mul(stages as u64); let system_writer = 1u64;
let stage_readers = edges as u64;
let metrics_readers = if metrics_enabled {
(2u64.saturating_mul(stages as u64)).saturating_add(1)
} else {
0
};
let system_readers = 2u64;
let overhead = 32u64;
let estimated = stage_writers
.saturating_add(system_writer)
.saturating_add(stage_readers)
.saturating_add(metrics_readers)
.saturating_add(system_readers)
.saturating_add(overhead);
DiskJournalFdEstimate {
stages,
edges,
metrics_enabled,
estimated_fds: estimated,
breakdown: DiskJournalFdBreakdown {
writer_fds: stage_writers.saturating_add(system_writer),
stage_reader_fds: stage_readers,
metrics_reader_fds: metrics_readers,
system_reader_fds: system_readers,
overhead_fds: overhead,
},
}
}
pub(crate) fn journal_error_is_too_many_open_files(err: &JournalError) -> bool {
match err {
JournalError::Implementation { source, .. } => {
if let Some(io_error) = source.downcast_ref::<io::Error>() {
io_error_is_too_many_open_files(io_error)
} else {
false
}
}
_ => false,
}
}
pub(crate) fn io_error_is_too_many_open_files(err: &io::Error) -> bool {
#[cfg(unix)]
{
matches!(err.raw_os_error(), Some(libc::EMFILE) | Some(libc::ENFILE))
}
#[cfg(not(unix))]
{
false
}
}
pub(crate) fn env_try_raise_nofile() -> bool {
std::env::var("OBZENFLOW_TRY_RAISE_NOFILE")
.ok()
.and_then(|v| v.parse::<bool>().ok())
.unwrap_or(false)
}
#[cfg(unix)]
fn get_nofile_limit() -> io::Result<NofileLimit> {
unsafe {
let mut current = libc::rlimit {
rlim_cur: 0,
rlim_max: 0,
};
if libc::getrlimit(libc::RLIMIT_NOFILE, &mut current) != 0 {
return Err(io::Error::last_os_error());
}
Ok(NofileLimit {
soft: current.rlim_cur,
hard: current.rlim_max,
})
}
}
#[cfg(not(unix))]
fn get_nofile_limit() -> io::Result<NofileLimit> {
Err(io::Error::new(
io::ErrorKind::Unsupported,
"RLIMIT_NOFILE is only available on unix",
))
}
#[cfg(unix)]
fn try_raise_nofile_soft_limit(desired_soft: u64) -> io::Result<NofileLimit> {
unsafe {
let current = get_nofile_limit()?;
if current.soft >= desired_soft {
return Ok(current);
}
let new_soft = std::cmp::min(desired_soft, current.hard);
let updated = libc::rlimit {
rlim_cur: new_soft as libc::rlim_t,
rlim_max: current.hard as libc::rlim_t,
};
if libc::setrlimit(libc::RLIMIT_NOFILE, &updated) != 0 {
return Err(io::Error::last_os_error());
}
get_nofile_limit()
}
}
#[cfg(not(unix))]
fn try_raise_nofile_soft_limit(_desired_soft: u64) -> io::Result<NofileLimit> {
get_nofile_limit()
}
pub(crate) fn preflight_nofile_for_disk_journals(
estimate: DiskJournalFdEstimate,
try_raise: bool,
) -> Result<Option<NofileLimit>, String> {
let current = match get_nofile_limit() {
Ok(limit) => limit,
Err(_) => return Ok(None),
};
if current.soft >= estimate.estimated_fds {
return Ok(Some(current));
}
if try_raise {
match try_raise_nofile_soft_limit(estimate.estimated_fds) {
Ok(raised) if raised.soft >= estimate.estimated_fds => return Ok(Some(raised)),
Ok(raised) => {
return Err(format!(
"Disk-journal pipeline requires ~{} file descriptors (stages={}, edges={}, metrics_enabled={}). RLIMIT_NOFILE soft={} hard={} (raised attempt reached soft={}). Increase your process file-descriptor limit (e.g. `ulimit -n {}`) or reduce pipeline size / disable metrics (`OBZENFLOW_METRICS_ENABLED=false`).",
estimate.estimated_fds,
estimate.stages,
estimate.edges,
estimate.metrics_enabled,
current.soft,
current.hard,
raised.soft,
estimate.estimated_fds
));
}
Err(e) => {
return Err(format!(
"Disk-journal pipeline requires ~{} file descriptors (stages={}, edges={}, metrics_enabled={}). RLIMIT_NOFILE soft={} hard={}. Attempt to raise soft limit failed: {}. Increase your process file-descriptor limit (e.g. `ulimit -n {}`) or reduce pipeline size / disable metrics (`OBZENFLOW_METRICS_ENABLED=false`).",
estimate.estimated_fds,
estimate.stages,
estimate.edges,
estimate.metrics_enabled,
current.soft,
current.hard,
e,
estimate.estimated_fds
));
}
}
}
Err(format!(
"Disk-journal pipeline requires ~{} file descriptors (stages={}, edges={}, metrics_enabled={}). RLIMIT_NOFILE soft={} hard={}. Increase your process file-descriptor limit (e.g. `ulimit -n {}`) or reduce pipeline size / disable metrics (`OBZENFLOW_METRICS_ENABLED=false`).",
estimate.estimated_fds,
estimate.stages,
estimate.edges,
estimate.metrics_enabled,
current.soft,
current.hard,
estimate.estimated_fds
))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
#[cfg(unix)]
fn detects_emfile_and_enfile_from_raw_os_error() {
let emfile = io::Error::from_raw_os_error(libc::EMFILE);
assert!(io_error_is_too_many_open_files(&emfile));
let enfile = io::Error::from_raw_os_error(libc::ENFILE);
assert!(io_error_is_too_many_open_files(&enfile));
let eacces = io::Error::from_raw_os_error(libc::EACCES);
assert!(!io_error_is_too_many_open_files(&eacces));
}
#[test]
#[cfg(unix)]
fn detects_too_many_open_files_inside_journal_error() {
let err = JournalError::Implementation {
message: "open failed".to_string(),
source: Box::new(io::Error::from_raw_os_error(libc::EMFILE)),
};
assert!(journal_error_is_too_many_open_files(&err));
}
#[test]
fn ignores_non_io_journal_errors() {
let err = JournalError::Implementation {
message: "open failed".to_string(),
source: "not an io::Error".into(),
};
assert!(!journal_error_is_too_many_open_files(&err));
}
}