use std::sync::{Condvar, Mutex, OnceLock};
use std::time::{Duration, Instant};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
struct Inner {
in_flight: usize,
}
pub struct SpawnGate {
inner: Mutex<Inner>,
cv: Condvar,
cap: usize,
timeout: Duration,
}
static GATE: OnceLock<SpawnGate> = OnceLock::new();
pub fn gate() -> &'static SpawnGate {
GATE.get_or_init(|| SpawnGate::new(compute_cap(), DEFAULT_TIMEOUT))
}
#[must_use = "dropping the permit immediately frees the spawn slot"]
pub struct SpawnPermit {
gate: &'static SpawnGate,
held: bool,
}
impl SpawnPermit {
pub fn is_held(&self) -> bool {
self.held
}
}
impl Drop for SpawnPermit {
fn drop(&mut self) {
if self.held {
self.gate.release();
}
}
}
impl std::fmt::Debug for SpawnPermit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SpawnPermit")
.field("held", &self.held)
.finish()
}
}
pub fn acquire() -> SpawnPermit {
acquire_on(gate())
}
fn acquire_on(gate: &'static SpawnGate) -> SpawnPermit {
let held = gate.reserve_blocking();
SpawnPermit { gate, held }
}
impl SpawnGate {
fn new(cap: usize, timeout: Duration) -> Self {
SpawnGate {
inner: Mutex::new(Inner { in_flight: 0 }),
cv: Condvar::new(),
cap,
timeout,
}
}
pub fn cap(&self) -> usize {
self.cap
}
pub fn in_flight(&self) -> usize {
self.inner.lock().unwrap().in_flight
}
fn reserve_blocking(&self) -> bool {
if self.cap == 0 {
return false;
}
let mut g = self.inner.lock().unwrap();
let mut waited_from: Option<Instant> = None;
loop {
if g.in_flight < self.cap {
g.in_flight += 1;
return true;
}
let t0 = *waited_from.get_or_insert_with(Instant::now);
let elapsed = t0.elapsed();
if elapsed >= self.timeout {
eprintln!(
"[spawn-gate] waited {:?} for a spawn slot ({}/{} boots in flight) — \
proceeding anyway to avoid a stall (set \
SUPERMACHINE_MAX_CONCURRENT_SPAWNS=0 to disable the gate)",
elapsed, g.in_flight, self.cap
);
g.in_flight += 1;
return true;
}
let (ng, _) = self.cv.wait_timeout(g, self.timeout - elapsed).unwrap();
g = ng;
}
}
fn release(&self) {
{
let mut g = self.inner.lock().unwrap();
g.in_flight = g.in_flight.saturating_sub(1);
}
self.cv.notify_one();
}
#[cfg(test)]
fn with_cap_for_test(cap: usize, timeout: Duration) -> &'static SpawnGate {
Box::leak(Box::new(SpawnGate::new(cap, timeout)))
}
}
fn compute_cap() -> usize {
if let Ok(v) = std::env::var("SUPERMACHINE_MAX_CONCURRENT_SPAWNS") {
let t = v.trim();
if let Ok(n) = t.parse::<usize>() {
return n;
}
eprintln!(
"[spawn-gate] ignoring unparseable SUPERMACHINE_MAX_CONCURRENT_SPAWNS={v:?}; \
using the host-parallelism default"
);
}
let par = std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(8);
par.saturating_mul(4).max(16)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
#[test]
fn disabled_gate_is_a_noop() {
let g = SpawnGate::with_cap_for_test(0, DEFAULT_TIMEOUT);
let p1 = acquire_on(g);
let p2 = acquire_on(g);
assert!(!p1.is_held() && !p2.is_held());
assert_eq!(g.in_flight(), 0);
}
#[test]
fn caps_in_flight_and_releases_on_drop() {
let g = SpawnGate::with_cap_for_test(2, DEFAULT_TIMEOUT);
let p1 = acquire_on(g);
let p2 = acquire_on(g);
assert_eq!(g.in_flight(), 2);
assert!(p1.is_held() && p2.is_held());
drop(p1);
assert_eq!(g.in_flight(), 1);
drop(p2);
assert_eq!(g.in_flight(), 0);
}
#[test]
fn third_spawn_blocks_until_a_permit_frees() {
let g = SpawnGate::with_cap_for_test(2, DEFAULT_TIMEOUT);
let p1 = acquire_on(g);
let _p2 = acquire_on(g);
assert_eq!(g.in_flight(), 2);
let admitted = Arc::new(AtomicU64::new(0));
let admitted_t = Arc::clone(&admitted);
let h = std::thread::spawn(move || {
let _p3 = acquire_on(g); admitted_t.store(1, Ordering::SeqCst);
std::thread::sleep(std::time::Duration::from_millis(20));
});
std::thread::sleep(std::time::Duration::from_millis(50));
assert_eq!(
admitted.load(Ordering::SeqCst),
0,
"third spawn must block while cap is full"
);
drop(p1); h.join().unwrap();
assert_eq!(admitted.load(Ordering::SeqCst), 1);
}
#[test]
fn proceeds_after_timeout_when_no_permit_frees() {
let g = SpawnGate::with_cap_for_test(1, Duration::from_millis(80));
let _p1 = acquire_on(g);
let t0 = Instant::now();
let p2 = acquire_on(g);
assert!(
t0.elapsed() >= Duration::from_millis(70),
"should wait ~the timeout"
);
assert!(p2.is_held(), "admitted anyway after timeout");
assert_eq!(g.in_flight(), 2, "overcommitted one slot to avoid a stall");
}
#[test]
fn default_cap_is_generous() {
assert!(compute_cap() >= 16, "default cap must stay generous");
}
}