use std::path::PathBuf;
use std::sync::{Condvar, Mutex, OnceLock};
use std::time::{Duration, Instant};
pub const WORKER_OVERHEAD_MIB: u64 = 64;
const DEFAULT_BUDGET_NUM: u64 = 80;
const DEFAULT_BUDGET_DEN: u64 = 100;
const DEFAULT_ADMISSION_TIMEOUT: Duration = Duration::from_secs(60);
struct Inner {
committed_mib: u64,
live: u64,
}
pub struct MemoryAccountant {
inner: Mutex<Inner>,
cv: Condvar,
budget_mib: u64,
timeout: Duration,
host: Option<HostCoord>,
}
static ACCOUNTANT: OnceLock<MemoryAccountant> = OnceLock::new();
pub fn accountant() -> &'static MemoryAccountant {
ACCOUNTANT.get_or_init(|| {
let budget = compute_budget_mib();
let mut acc = MemoryAccountant::new(budget, admission_timeout());
if budget != 0 && budget_scope_is_host() {
match HostCoord::new(host_budget_dir()) {
Ok(h) => acc.host = Some(h),
Err(e) => eprintln!(
"[memory-admission] host scope requested but shared dir is unusable \
({e}); falling back to per-process budget"
),
}
}
acc
})
}
#[must_use = "dropping the guard immediately releases the memory reservation"]
pub struct AdmissionGuard {
accountant: &'static MemoryAccountant,
mib: u64,
}
impl AdmissionGuard {
pub fn reserved_mib(&self) -> u64 {
self.mib
}
}
impl Drop for AdmissionGuard {
fn drop(&mut self) {
if self.mib > 0 {
self.accountant.release(self.mib);
}
}
}
impl std::fmt::Debug for AdmissionGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("AdmissionGuard")
.field("mib", &self.mib)
.finish()
}
}
pub fn admit(mib: u64) -> AdmissionGuard {
admit_on(accountant(), mib)
}
fn admit_on(accountant: &'static MemoryAccountant, mib: u64) -> AdmissionGuard {
let reserved = accountant.reserve_blocking(mib);
AdmissionGuard {
accountant,
mib: reserved,
}
}
pub fn charge(mib: u64) -> AdmissionGuard {
let accountant = accountant();
let reserved = accountant.charge(mib);
AdmissionGuard {
accountant,
mib: reserved,
}
}
impl MemoryAccountant {
fn new(budget_mib: u64, timeout: Duration) -> Self {
MemoryAccountant {
inner: Mutex::new(Inner {
committed_mib: 0,
live: 0,
}),
cv: Condvar::new(),
budget_mib,
timeout,
host: None,
}
}
pub fn budget_mib(&self) -> u64 {
self.budget_mib
}
pub fn snapshot(&self) -> (u64, u64) {
if let Some(h) = &self.host {
return h.snapshot();
}
let g = self.inner.lock().unwrap();
(g.committed_mib, g.live)
}
fn reserve_blocking(&self, mib: u64) -> u64 {
if self.budget_mib == 0 || mib == 0 {
return 0;
}
if let Some(h) = &self.host {
return h.reserve_blocking(mib, self.budget_mib, self.timeout);
}
let mut g = self.inner.lock().unwrap();
let mut waited_from: Option<Instant> = None;
loop {
if g.committed_mib == 0 || g.committed_mib + mib <= self.budget_mib {
if mib > self.budget_mib && g.committed_mib == 0 {
eprintln!(
"[memory-admission] worker footprint {mib} MiB exceeds the entire \
budget {} MiB — admitting it alone; raise \
SUPERMACHINE_MEMORY_BUDGET_MIB or lower the VM's --memory",
self.budget_mib
);
}
g.committed_mib += mib;
g.live += 1;
if let Some(t0) = waited_from {
eprintln!(
"[memory-admission] admitted {mib} MiB after waiting {:?} \
(now {}/{} MiB across {} workers)",
t0.elapsed(),
g.committed_mib,
self.budget_mib,
g.live
);
}
return mib;
}
let t0 = *waited_from.get_or_insert_with(|| {
eprintln!(
"[memory-admission] spawn needs {mib} MiB but {}/{} MiB already \
committed across {} workers — waiting for a release rather than \
overcommitting host RAM (set SUPERMACHINE_MEMORY_BUDGET_MIB=0 to disable)",
g.committed_mib, self.budget_mib, g.live
);
Instant::now()
});
let elapsed = t0.elapsed();
if elapsed >= self.timeout {
eprintln!(
"[memory-admission] waited {:?} for {mib} MiB with no release \
({}/{} MiB committed across {} workers) — admitting anyway to avoid a \
hang; the host may now be overcommitted (lower pool size / VM --memory, \
or raise SUPERMACHINE_MEMORY_BUDGET_MIB)",
elapsed, g.committed_mib, self.budget_mib, g.live
);
g.committed_mib += mib;
g.live += 1;
return mib;
}
let (ng, _) = self.cv.wait_timeout(g, self.timeout - elapsed).unwrap();
g = ng;
}
}
fn charge(&self, mib: u64) -> u64 {
if self.budget_mib == 0 || mib == 0 {
return 0;
}
if let Some(h) = &self.host {
return h.charge(mib);
}
let mut g = self.inner.lock().unwrap();
g.committed_mib += mib;
g.live += 1;
mib
}
fn release(&self, mib: u64) {
if let Some(h) = &self.host {
h.release(mib);
return;
}
{
let mut g = self.inner.lock().unwrap();
g.committed_mib = g.committed_mib.saturating_sub(mib);
g.live = g.live.saturating_sub(1);
}
self.cv.notify_all();
}
#[cfg(test)]
fn with_budget_for_test(budget_mib: u64) -> &'static MemoryAccountant {
Box::leak(Box::new(MemoryAccountant::new(
budget_mib,
DEFAULT_ADMISSION_TIMEOUT,
)))
}
#[cfg(test)]
fn with_budget_and_timeout_for_test(
budget_mib: u64,
timeout: Duration,
) -> &'static MemoryAccountant {
Box::leak(Box::new(MemoryAccountant::new(budget_mib, timeout)))
}
}
fn compute_budget_mib() -> u64 {
if let Ok(v) = std::env::var("SUPERMACHINE_MEMORY_BUDGET_MIB") {
let t = v.trim();
if let Ok(n) = t.parse::<u64>() {
return n;
}
eprintln!(
"[memory-admission] ignoring unparseable SUPERMACHINE_MEMORY_BUDGET_MIB={v:?}; \
falling back to the host-RAM default"
);
}
let host_mib = host_ram_mib();
if host_mib == 0 {
return 0;
}
host_mib.saturating_mul(DEFAULT_BUDGET_NUM) / DEFAULT_BUDGET_DEN
}
fn admission_timeout() -> Duration {
if let Ok(v) = std::env::var("SUPERMACHINE_MEMORY_ADMISSION_TIMEOUT_MS") {
if let Ok(ms) = v.trim().parse::<u64>() {
return Duration::from_millis(ms);
}
}
DEFAULT_ADMISSION_TIMEOUT
}
#[cfg(target_os = "macos")]
fn host_ram_mib() -> u64 {
let mut size: u64 = 0;
let mut len = std::mem::size_of::<u64>();
let name = c"hw.memsize";
let r = unsafe {
libc::sysctlbyname(
name.as_ptr(),
&mut size as *mut _ as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if r == 0 && size > 0 {
size / (1024 * 1024)
} else {
0
}
}
#[cfg(not(target_os = "macos"))]
fn host_ram_mib() -> u64 {
std::fs::read_to_string("/proc/meminfo")
.ok()
.and_then(|s| parse_meminfo_total_kb(&s))
.map(|kb| kb / 1024)
.unwrap_or(0)
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
fn parse_meminfo_total_kb(s: &str) -> Option<u64> {
s.lines().find_map(|l| {
let rest = l.strip_prefix("MemTotal:")?;
rest.split_whitespace().next()?.parse::<u64>().ok()
})
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
fn parse_status_vmrss_kb(s: &str) -> Option<u64> {
s.lines().find_map(|l| {
let rest = l.strip_prefix("VmRSS:")?;
rest.split_whitespace().next()?.parse::<u64>().ok()
})
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
fn parse_psi_some_avg10(s: &str) -> Option<f64> {
s.lines()
.find_map(|l| l.strip_prefix("some "))
.and_then(|rest| {
rest.split_whitespace()
.find_map(|tok| tok.strip_prefix("avg10="))
})
.and_then(|v| v.parse::<f64>().ok())
}
#[cfg(target_os = "macos")]
pub fn phys_footprint_mib(pid: u32) -> Option<u64> {
let mut info: libc::rusage_info_v2 = unsafe { std::mem::zeroed() };
let rc = unsafe {
libc::proc_pid_rusage(
pid as libc::c_int,
libc::RUSAGE_INFO_V2,
&mut info as *mut libc::rusage_info_v2 as *mut libc::rusage_info_t,
)
};
if rc == 0 {
Some(info.ri_phys_footprint / (1024 * 1024))
} else {
None
}
}
#[cfg(not(target_os = "macos"))]
pub fn phys_footprint_mib(pid: u32) -> Option<u64> {
let status = std::fs::read_to_string(format!("/proc/{pid}/status")).ok()?;
parse_status_vmrss_kb(&status).map(|kb| kb / 1024)
}
#[cfg(target_os = "macos")]
pub fn memory_pressure_level() -> u32 {
let mut level: u32 = 1;
let mut len = std::mem::size_of::<u32>();
let name = c"kern.memorystatus_vm_pressure_level";
let rc = unsafe {
libc::sysctlbyname(
name.as_ptr(),
&mut level as *mut u32 as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if rc == 0 {
level
} else {
1
}
}
#[cfg(not(target_os = "macos"))]
pub fn memory_pressure_level() -> u32 {
match std::fs::read_to_string("/proc/pressure/memory")
.ok()
.and_then(|s| parse_psi_some_avg10(&s))
{
Some(avg10) => psi_avg10_to_level(avg10),
None => 1,
}
}
#[cfg_attr(target_os = "macos", allow(dead_code))]
fn psi_avg10_to_level(avg10: f64) -> u32 {
if avg10 >= 20.0 {
4
} else if avg10 >= 5.0 {
2
} else {
1
}
}
pub fn await_pressure_relief() {
const WARN: u32 = 2;
let timeout = Duration::from_secs(10);
let t0 = Instant::now();
let mut announced = false;
while memory_pressure_level() >= WARN {
if t0.elapsed() >= timeout {
eprintln!(
"[memory-admission] host under memory pressure for {:?} — admitting anyway \
to avoid a stall",
t0.elapsed()
);
return;
}
if !announced {
eprintln!(
"[memory-admission] host memory pressure elevated — pausing new spawns until \
it relieves"
);
announced = true;
}
std::thread::sleep(Duration::from_millis(250));
}
if announced {
eprintln!(
"[memory-admission] memory pressure relieved after {:?}",
t0.elapsed()
);
}
}
fn budget_scope_is_host() -> bool {
std::env::var("SUPERMACHINE_MEMORY_BUDGET_SCOPE")
.map(|v| v.trim().eq_ignore_ascii_case("host"))
.unwrap_or(false)
}
fn host_budget_dir() -> PathBuf {
if let Ok(d) = std::env::var("SUPERMACHINE_MEMORY_BUDGET_DIR") {
if !d.trim().is_empty() {
return PathBuf::from(d);
}
}
std::env::temp_dir().join("supermachine-mem-admission")
}
struct HostCoord {
dir: PathBuf,
lock_path: PathBuf,
my_path: PathBuf,
my_committed: Mutex<u64>,
}
impl HostCoord {
fn new(dir: PathBuf) -> std::io::Result<Self> {
std::fs::create_dir_all(&dir)?;
let pid = std::process::id();
let my_path = dir.join(pid.to_string());
let lock_path = dir.join(".lock");
std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false)
.open(&lock_path)?;
Ok(HostCoord {
dir,
lock_path,
my_path,
my_committed: Mutex::new(0),
})
}
fn with_lock<T>(&self, f: impl FnOnce() -> T) -> Option<T> {
use std::os::unix::io::AsRawFd;
let file = std::fs::OpenOptions::new()
.create(true)
.write(true)
.truncate(false) .open(&self.lock_path)
.ok()?;
let fd = file.as_raw_fd();
if unsafe { libc::flock(fd, libc::LOCK_EX) } != 0 {
return None;
}
let out = f();
unsafe {
libc::flock(fd, libc::LOCK_UN);
}
Some(out)
}
fn host_committed_locked(&self) -> u64 {
let mut total = 0u64;
let Ok(rd) = std::fs::read_dir(&self.dir) else {
return 0;
};
for ent in rd.flatten() {
let name = ent.file_name();
let name = name.to_string_lossy();
if name.starts_with('.') {
continue; }
let Ok(pid) = name.parse::<i32>() else {
continue;
};
let alive = unsafe { libc::kill(pid, 0) } == 0
|| std::io::Error::last_os_error().raw_os_error() == Some(libc::EPERM);
if !alive {
let _ = std::fs::remove_file(ent.path());
continue;
}
if let Ok(s) = std::fs::read_to_string(ent.path()) {
if let Ok(v) = s.trim().parse::<u64>() {
total = total.saturating_add(v);
}
}
}
total
}
fn write_my_locked(&self, mib: u64) {
if mib == 0 {
let _ = std::fs::remove_file(&self.my_path);
} else {
let _ = std::fs::write(&self.my_path, mib.to_string());
}
*self.my_committed.lock().unwrap() = mib;
}
fn reserve_blocking(&self, mib: u64, budget: u64, timeout: Duration) -> u64 {
let mut waited_from: Option<Instant> = None;
loop {
let decided = self.with_lock(|| {
let host = self.host_committed_locked(); if host == 0 || host + mib <= budget {
let mine = *self.my_committed.lock().unwrap();
self.write_my_locked(mine + mib);
true
} else {
false
}
});
match decided {
Some(true) | None => {
if let Some(t0) = waited_from {
eprintln!(
"[memory-admission/host] admitted {mib} MiB after waiting {:?}",
t0.elapsed()
);
}
if decided.is_none() {
let mine = *self.my_committed.lock().unwrap();
*self.my_committed.lock().unwrap() = mine + mib;
}
return mib;
}
Some(false) => {}
}
let t0 = *waited_from.get_or_insert_with(|| {
eprintln!(
"[memory-admission/host] spawn needs {mib} MiB but the host-wide budget \
{budget} MiB is full across processes — waiting for a release"
);
Instant::now()
});
if t0.elapsed() >= timeout {
eprintln!(
"[memory-admission/host] waited {:?} with no host-wide release — admitting \
anyway to avoid a hang (host may be overcommitted across processes)",
t0.elapsed()
);
let _ = self.with_lock(|| {
let mine = *self.my_committed.lock().unwrap();
self.write_my_locked(mine + mib);
});
return mib;
}
std::thread::sleep(Duration::from_millis(200));
}
}
fn charge(&self, mib: u64) -> u64 {
let _ = self.with_lock(|| {
let mine = *self.my_committed.lock().unwrap();
self.write_my_locked(mine + mib);
});
mib
}
fn release(&self, mib: u64) {
let _ = self.with_lock(|| {
let mine = *self.my_committed.lock().unwrap();
self.write_my_locked(mine.saturating_sub(mib));
});
}
fn snapshot(&self) -> (u64, u64) {
let host = self.with_lock(|| self.host_committed_locked()).unwrap_or(0);
let mine = *self.my_committed.lock().unwrap();
(host, mine)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[test]
fn parses_proc_meminfo_total() {
let s = "MemTotal: 32791528 kB\nMemFree: 1234 kB\nMemAvailable: 9 kB\n";
assert_eq!(parse_meminfo_total_kb(s), Some(32791528));
assert_eq!(parse_meminfo_total_kb("Buffers: 1 kB\n"), None);
}
#[test]
fn parses_proc_status_vmrss() {
let s = "Name:\tnginx\nVmPeak:\t 720896 kB\nVmRSS:\t 52428 kB\nThreads: 2\n";
assert_eq!(parse_status_vmrss_kb(s), Some(52428));
assert_eq!(parse_status_vmrss_kb("Name:\tx\n"), None);
}
#[test]
fn parses_and_grades_psi_some_avg10() {
let s = "some avg10=12.34 avg60=4.00 avg300=1.00 total=999\n\
full avg10=2.00 avg60=1.00 avg300=0.50 total=10\n";
assert_eq!(parse_psi_some_avg10(s), Some(12.34));
assert_eq!(psi_avg10_to_level(0.0), 1);
assert_eq!(psi_avg10_to_level(4.99), 1);
assert_eq!(psi_avg10_to_level(5.0), 2);
assert_eq!(psi_avg10_to_level(19.99), 2);
assert_eq!(psi_avg10_to_level(20.0), 4);
assert_eq!(psi_avg10_to_level(80.0), 4);
assert_eq!(parse_psi_some_avg10("garbage\n"), None);
}
#[cfg(target_os = "linux")]
#[test]
fn linux_proc_probes_return_real_values() {
assert!(host_ram_mib() > 0, "MemTotal should be readable on Linux");
let me = std::process::id();
let rss = phys_footprint_mib(me).expect("VmRSS for self");
assert!(rss > 0, "self VmRSS should be > 0 MiB, got {rss}");
assert!(matches!(memory_pressure_level(), 1 | 2 | 4));
}
#[test]
fn disabled_gate_is_a_noop() {
let acc = MemoryAccountant::with_budget_for_test(0);
let g1 = admit_on(acc, 100_000);
let g2 = admit_on(acc, 100_000);
assert_eq!(acc.snapshot(), (0, 0));
assert_eq!(g1.reserved_mib(), 0);
assert_eq!(g2.reserved_mib(), 0);
}
#[test]
fn accounting_charges_and_releases() {
let acc = MemoryAccountant::with_budget_for_test(1000);
let g1 = admit_on(acc, 300);
let g2 = admit_on(acc, 400);
assert_eq!(acc.snapshot(), (700, 2));
drop(g1);
assert_eq!(acc.snapshot(), (400, 1));
drop(g2);
assert_eq!(acc.snapshot(), (0, 0));
}
#[test]
fn over_budget_single_worker_is_admitted_not_deadlocked() {
let acc = MemoryAccountant::with_budget_for_test(512);
let g = admit_on(acc, 4096);
assert_eq!(acc.snapshot(), (4096, 1));
drop(g);
assert_eq!(acc.snapshot(), (0, 0));
}
#[test]
fn third_spawn_blocks_until_a_release_then_proceeds() {
let acc = MemoryAccountant::with_budget_for_test(1000);
let g1 = admit_on(acc, 400);
let _g2 = admit_on(acc, 400);
assert_eq!(acc.snapshot(), (800, 2));
let admitted = Arc::new(AtomicU64::new(0));
let admitted_t = Arc::clone(&admitted);
let h = std::thread::spawn(move || {
let _g3 = admit_on(acc, 400);
admitted_t.store(1, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(20));
});
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(
admitted.load(Ordering::SeqCst),
0,
"third spawn must block while full"
);
assert_eq!(acc.snapshot().1, 2, "still only two live workers");
drop(g1);
h.join().unwrap();
assert_eq!(
admitted.load(Ordering::SeqCst),
1,
"third spawn admitted after release"
);
}
#[test]
fn admits_after_timeout_when_no_release_comes() {
let acc = MemoryAccountant::with_budget_and_timeout_for_test(
1000,
std::time::Duration::from_millis(80),
);
let _g1 = admit_on(acc, 800); let t0 = std::time::Instant::now();
let g2 = admit_on(acc, 800); let waited = t0.elapsed();
assert!(
waited >= std::time::Duration::from_millis(70),
"should have waited ~the timeout before admitting, waited {waited:?}"
);
assert_eq!(g2.reserved_mib(), 800, "admitted anyway after timeout");
assert_eq!(
acc.snapshot(),
(1600, 2),
"overcommitted (1600 > 1000 budget)"
);
}
#[cfg(target_os = "macos")]
#[test]
fn phys_footprint_of_self_is_measurable() {
let mib = phys_footprint_mib(std::process::id());
assert!(
mib.is_some(),
"proc_pid_rusage must succeed for our own pid"
);
assert!(mib.unwrap() > 0, "our own phys_footprint should be > 0 MiB");
assert!(memory_pressure_level() >= 1);
}
#[test]
fn worker_overhead_is_modest() {
assert!(
WORKER_OVERHEAD_MIB <= 256,
"overhead should stay a small per-worker add"
);
}
#[test]
fn host_coord_reserves_releases_and_reclaims_stale_pids() {
use std::sync::atomic::{AtomicU64, Ordering};
static SEQ: AtomicU64 = AtomicU64::new(0);
let dir = std::env::temp_dir().join(format!(
"sm-memadm-test-{}-{}",
std::process::id(),
SEQ.fetch_add(1, Ordering::Relaxed)
));
let coord = HostCoord::new(dir.clone()).expect("host coord");
let budget = 1000u64;
let timeout = Duration::from_millis(200);
assert_eq!(coord.reserve_blocking(400, budget, timeout), 400);
assert_eq!(coord.snapshot(), (400, 400));
std::fs::write(dir.join(i32::MAX.to_string()), "500").unwrap();
assert_eq!(coord.reserve_blocking(200, budget, timeout), 200);
assert_eq!(
coord.snapshot(),
(600, 600),
"stale entry reclaimed, not counted"
);
assert!(
!dir.join(i32::MAX.to_string()).exists(),
"dead-PID reservation file must be reclaimed"
);
coord.release(600);
assert_eq!(coord.snapshot(), (0, 0));
assert!(
!coord.my_path.exists(),
"our pid file removed when contribution hits 0"
);
let _ = std::fs::remove_dir_all(&dir);
}
fn unique_coord_dir(tag: &str) -> PathBuf {
use std::sync::atomic::{AtomicU64, Ordering};
static SEQ: AtomicU64 = AtomicU64::new(0);
std::env::temp_dir().join(format!(
"sm-memadm-{tag}-{}-{}",
std::process::id(),
SEQ.fetch_add(1, Ordering::Relaxed)
))
}
#[test]
fn host_coord_counts_other_live_process_then_reclaims_when_it_dies() {
let dir = unique_coord_dir("xproc");
let coord = HostCoord::new(dir.clone()).expect("coord");
let budget = 1000u64;
let mut child = std::process::Command::new("sleep")
.arg("30")
.spawn()
.expect("spawn child");
std::fs::write(dir.join(child.id().to_string()), "700").unwrap();
let t0 = Instant::now();
assert_eq!(
coord.reserve_blocking(400, budget, Duration::from_millis(60)),
400
);
assert!(
t0.elapsed() >= Duration::from_millis(50),
"should have waited out the timeout before admitting"
);
assert_eq!(
coord.snapshot(),
(1100, 400),
"a live peer's reservation is counted in the host-wide total"
);
let dead_path = dir.join(child.id().to_string());
child.kill().ok();
child.wait().ok();
assert_eq!(
coord.snapshot(),
(400, 400),
"dead peer's reservation reclaimed"
);
assert!(!dead_path.exists(), "stale peer file unlinked");
coord.release(400);
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn host_coord_partial_release_and_saturates_at_zero() {
let dir = unique_coord_dir("release");
let coord = HostCoord::new(dir.clone()).expect("coord");
let timeout = Duration::from_millis(50);
assert_eq!(coord.reserve_blocking(600, 10_000, timeout), 600);
coord.release(200);
assert_eq!(
coord.snapshot(),
(400, 400),
"partial release leaves the remainder"
);
assert!(coord.my_path.exists(), "pid file persists while > 0");
coord.release(99_999);
assert_eq!(coord.snapshot(), (0, 0));
assert!(!coord.my_path.exists(), "pid file removed at zero");
let _ = std::fs::remove_dir_all(&dir);
}
#[test]
fn host_coord_budget_boundary_inclusive_then_admits_over() {
let dir = unique_coord_dir("boundary");
let coord = HostCoord::new(dir.clone()).expect("coord");
let timeout = Duration::from_millis(50);
assert_eq!(coord.reserve_blocking(600, 1000, timeout), 600);
assert_eq!(
coord.reserve_blocking(400, 1000, timeout),
400,
"reserving up to exactly the budget is inclusive"
);
assert_eq!(coord.snapshot(), (1000, 1000));
let t0 = Instant::now();
assert_eq!(
coord.reserve_blocking(1, 1000, Duration::from_millis(60)),
1
);
assert!(
t0.elapsed() >= Duration::from_millis(50),
"over-budget reserve must wait out the timeout"
);
coord.release(1001);
let _ = std::fs::remove_dir_all(&dir);
}
}