supermachine 0.7.68

Run any OCI/Docker image as a hardware-isolated microVM on macOS HVF (Linux KVM and Windows WHP in progress). Single library API, zero flags for the common case, sub-100 ms cold-restore from snapshot.
Documentation
//! Process-wide spawn-concurrency gate (boot-rate limiter).
//!
//! Booting / restoring a microVM is thread- and CPU-heavy (vCPU threads,
//! kernel restore, device init). When a pool warms many workers at once,
//! a storm of *concurrent boots* can exhaust host thread / process limits
//! — observed under load as `smpark_park: Resource temporarily
//! unavailable` (EAGAIN) during restore. This gate caps how many spawns
//! are IN FLIGHT (booting) at once.
//!
//! Crucially it does NOT cap how many VMs run: the permit is held only
//! for the duration of `spawn_one` (the boot + handshake) and released
//! the moment the worker is built. So steady-state VM density and the
//! acquire / exec / dataplane hot paths are completely untouched — only
//! the boot *rate* during a burst is smoothed. A pool that wants 64 warm
//! workers still gets 64; they just boot in waves instead of forking all
//! at once (which is faster anyway on a bounded core count, and avoids
//! the EAGAIN cliff).
//!
//! Cap = `SUPERMACHINE_MAX_CONCURRENT_SPAWNS` if set (`0` = disabled),
//! else `max(16, 4 × host parallelism)` — generous enough that a normal
//! warmup never blocks, tight enough to defuse a pathological storm.

use std::sync::{Condvar, Mutex, OnceLock};
use std::time::{Duration, Instant};

/// Forward-progress valve: if a wedged boot holds its permit longer than
/// this, a waiter proceeds anyway (overcommitting one slot) rather than
/// stalling spawns indefinitely. Boots are bounded by the control-handshake
/// deadline in practice, so this should never fire in normal operation.
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);

struct Inner {
    in_flight: usize,
}

/// Counting-semaphore gate over concurrent in-flight spawns.
pub struct SpawnGate {
    inner: Mutex<Inner>,
    cv: Condvar,
    /// Max concurrent boots; `0` = disabled (unlimited).
    cap: usize,
    timeout: Duration,
}

static GATE: OnceLock<SpawnGate> = OnceLock::new();

/// The process-wide spawn gate (cap probed once from env / host parallelism).
pub fn gate() -> &'static SpawnGate {
    GATE.get_or_init(|| SpawnGate::new(compute_cap(), DEFAULT_TIMEOUT))
}

/// RAII permit for one in-flight spawn. Frees the slot on drop. Hold it
/// ONLY for the duration of the boot (a local in `spawn_one`), so the gate
/// caps concurrent boots, not running VMs. `held == false` when the gate
/// is disabled — a zero-cost no-op permit.
#[must_use = "dropping the permit immediately frees the spawn slot"]
pub struct SpawnPermit {
    gate: &'static SpawnGate,
    held: bool,
}

impl SpawnPermit {
    /// Whether this permit actually occupies a slot (false when disabled).
    pub fn is_held(&self) -> bool {
        self.held
    }
}

impl Drop for SpawnPermit {
    fn drop(&mut self) {
        if self.held {
            self.gate.release();
        }
    }
}

impl std::fmt::Debug for SpawnPermit {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("SpawnPermit")
            .field("held", &self.held)
            .finish()
    }
}

/// Block until an in-flight-spawn slot is free, then occupy it. The
/// returned permit frees the slot on drop. Disabled (cap 0) → returns a
/// no-op permit immediately.
pub fn acquire() -> SpawnPermit {
    acquire_on(gate())
}

fn acquire_on(gate: &'static SpawnGate) -> SpawnPermit {
    let held = gate.reserve_blocking();
    SpawnPermit { gate, held }
}

impl SpawnGate {
    fn new(cap: usize, timeout: Duration) -> Self {
        SpawnGate {
            inner: Mutex::new(Inner { in_flight: 0 }),
            cv: Condvar::new(),
            cap,
            timeout,
        }
    }

    /// Configured cap (`0` = disabled).
    pub fn cap(&self) -> usize {
        self.cap
    }

    /// Current count of in-flight spawns (diagnostics / tests).
    pub fn in_flight(&self) -> usize {
        self.inner.lock().unwrap().in_flight
    }

    /// Occupy a slot, blocking until one is free. Returns `true` if a slot
    /// was taken (so the permit releases on drop), `false` when disabled.
    /// Never blocks forever — proceeds after the timeout (a wedged boot
    /// can't stall the pool).
    fn reserve_blocking(&self) -> bool {
        if self.cap == 0 {
            return false;
        }
        let mut g = self.inner.lock().unwrap();
        let mut waited_from: Option<Instant> = None;
        loop {
            if g.in_flight < self.cap {
                g.in_flight += 1;
                return true;
            }
            let t0 = *waited_from.get_or_insert_with(Instant::now);
            let elapsed = t0.elapsed();
            if elapsed >= self.timeout {
                eprintln!(
                    "[spawn-gate] waited {:?} for a spawn slot ({}/{} boots in flight) — \
                     proceeding anyway to avoid a stall (set \
                     SUPERMACHINE_MAX_CONCURRENT_SPAWNS=0 to disable the gate)",
                    elapsed, g.in_flight, self.cap
                );
                g.in_flight += 1;
                return true;
            }
            let (ng, _) = self.cv.wait_timeout(g, self.timeout - elapsed).unwrap();
            g = ng;
        }
    }

    fn release(&self) {
        {
            let mut g = self.inner.lock().unwrap();
            g.in_flight = g.in_flight.saturating_sub(1);
        }
        // Exactly one slot freed → wake one waiter.
        self.cv.notify_one();
    }

    #[cfg(test)]
    fn with_cap_for_test(cap: usize, timeout: Duration) -> &'static SpawnGate {
        Box::leak(Box::new(SpawnGate::new(cap, timeout)))
    }
}

/// Compute the cap once: explicit env override (incl. `0` = disabled)
/// wins; otherwise a generous multiple of host parallelism.
fn compute_cap() -> usize {
    if let Ok(v) = std::env::var("SUPERMACHINE_MAX_CONCURRENT_SPAWNS") {
        let t = v.trim();
        if let Ok(n) = t.parse::<usize>() {
            return n;
        }
        eprintln!(
            "[spawn-gate] ignoring unparseable SUPERMACHINE_MAX_CONCURRENT_SPAWNS={v:?}; \
             using the host-parallelism default"
        );
    }
    let par = std::thread::available_parallelism()
        .map(|n| n.get())
        .unwrap_or(8);
    par.saturating_mul(4).max(16)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::sync::atomic::{AtomicU64, Ordering};
    use std::sync::Arc;

    #[test]
    fn disabled_gate_is_a_noop() {
        let g = SpawnGate::with_cap_for_test(0, DEFAULT_TIMEOUT);
        let p1 = acquire_on(g);
        let p2 = acquire_on(g);
        assert!(!p1.is_held() && !p2.is_held());
        assert_eq!(g.in_flight(), 0);
    }

    #[test]
    fn caps_in_flight_and_releases_on_drop() {
        let g = SpawnGate::with_cap_for_test(2, DEFAULT_TIMEOUT);
        let p1 = acquire_on(g);
        let p2 = acquire_on(g);
        assert_eq!(g.in_flight(), 2);
        assert!(p1.is_held() && p2.is_held());
        drop(p1);
        assert_eq!(g.in_flight(), 1);
        drop(p2);
        assert_eq!(g.in_flight(), 0);
    }

    #[test]
    fn third_spawn_blocks_until_a_permit_frees() {
        let g = SpawnGate::with_cap_for_test(2, DEFAULT_TIMEOUT);
        let p1 = acquire_on(g);
        let _p2 = acquire_on(g);
        assert_eq!(g.in_flight(), 2);

        let admitted = Arc::new(AtomicU64::new(0));
        let admitted_t = Arc::clone(&admitted);
        let h = std::thread::spawn(move || {
            let _p3 = acquire_on(g); // cap full → must block
            admitted_t.store(1, Ordering::SeqCst);
            std::thread::sleep(std::time::Duration::from_millis(20));
        });
        std::thread::sleep(std::time::Duration::from_millis(50));
        assert_eq!(
            admitted.load(Ordering::SeqCst),
            0,
            "third spawn must block while cap is full"
        );
        drop(p1); // frees a slot → blocked spawn proceeds
        h.join().unwrap();
        assert_eq!(admitted.load(Ordering::SeqCst), 1);
    }

    #[test]
    fn proceeds_after_timeout_when_no_permit_frees() {
        // Cap 1, nothing released: a second spawn must NOT hang — after the
        // short timeout it proceeds (overcommitting), guaranteeing progress.
        let g = SpawnGate::with_cap_for_test(1, Duration::from_millis(80));
        let _p1 = acquire_on(g);
        let t0 = Instant::now();
        let p2 = acquire_on(g);
        assert!(
            t0.elapsed() >= Duration::from_millis(70),
            "should wait ~the timeout"
        );
        assert!(p2.is_held(), "admitted anyway after timeout");
        assert_eq!(g.in_flight(), 2, "overcommitted one slot to avoid a stall");
    }

    #[test]
    fn default_cap_is_generous() {
        // Guard against a future change that throttles normal warmups.
        assert!(compute_cap() >= 16, "default cap must stay generous");
    }
}