car-server-core 0.9.0

Transport-neutral library for the CAR daemon JSON-RPC dispatcher (used by car-server and tokhn-daemon)
Documentation
//! Process-wide admission control for inference RPC handlers.
//!
//! ## Why this exists
//!
//! Until this module landed, every WebSocket session shared the
//! `ServerState.inference: OnceLock<Arc<InferenceEngine>>` — which is
//! correct for *model loading* (one set of weights, not N), but the
//! engine itself queues nothing. So when N users (or N
//! parallel-FFI-spawned `car infer` calls that auto-start a daemon)
//! land at once, each call enters the engine concurrently, each
//! triggers its own KV-cache allocation, each holds activations during
//! decode, and the host RAM is overwhelmed long before any single
//! request finishes.
//!
//! The fix is a global semaphore. The number of permits is sized from
//! detected host RAM — roughly "one concurrent generation per 8 GB"
//! with floor 1 and ceiling 8. Embedders and operators can override
//! via `CAR_INFERENCE_MAX_CONCURRENT`.
//!
//! Embedded streaming RPCs (`infer_stream`, `voice.transcribe_stream`,
//! …) hold their permit for the duration of the stream; one-shot RPCs
//! release on response.
//!
//! Memory-pressure-aware *eviction* of loaded weights is a separate
//! concern handled by `car_inference::backend_cache::BackendCache`.
//! This module only gates *new admissions*. Together they form a
//! two-layer defense: admission keeps concurrent activations bounded;
//! the LRU backend cache keeps loaded weights bounded.

use std::sync::Arc;
use std::time::Duration;

use tokio::sync::{OwnedSemaphorePermit, Semaphore};

/// Env var that overrides the auto-sized permit count. Setting it to
/// `1` forces full serialization, which is the right answer on a small
/// laptop running a meeting bot.
pub const ENV_MAX_CONCURRENT: &str = "CAR_INFERENCE_MAX_CONCURRENT";

/// Threshold (ms) above which an acquire-wait gets logged. Tuned for
/// "this should normally be instant; surface it when it isn't."
const SLOW_ACQUIRE_LOG_MS: u128 = 100;

/// Process-wide gate on concurrent inference requests. Cheap to clone —
/// internally just an `Arc<Semaphore>` plus the chosen permit count.
#[derive(Clone)]
pub struct InferenceAdmission {
    sem: Arc<Semaphore>,
    permits: usize,
}

impl InferenceAdmission {
    /// Build the controller, sizing the permit count from host RAM
    /// unless [`ENV_MAX_CONCURRENT`] is set.
    pub fn new() -> Self {
        let permits = chosen_permit_count();
        tracing::info!(
            permits,
            env = ENV_MAX_CONCURRENT,
            "inference admission controller online"
        );
        Self::with_permits(permits)
    }

    /// Build with an explicit permit count. Skips env probing — useful
    /// for embedders that already know what they want and for tests
    /// that need a deterministic cap without racing on a process-wide
    /// env var.
    pub fn with_permits(permits: usize) -> Self {
        let permits = permits.max(1);
        Self {
            sem: Arc::new(Semaphore::new(permits)),
            permits,
        }
    }

    /// Acquire a permit. Returns an owned guard whose `Drop` releases
    /// the slot — keep it alive for the full duration of the inference
    /// call (including any token streaming).
    pub async fn acquire(&self) -> OwnedSemaphorePermit {
        let started = std::time::Instant::now();
        let permit = self
            .sem
            .clone()
            .acquire_owned()
            .await
            .expect("inference admission semaphore is never closed");
        let waited_ms = started.elapsed().as_millis();
        if waited_ms >= SLOW_ACQUIRE_LOG_MS {
            tracing::info!(
                waited_ms,
                permits_total = self.permits,
                permits_available = self.sem.available_permits(),
                "inference request queued behind concurrency limit"
            );
        }
        permit
    }

    /// Try to acquire a permit without blocking. Returns `None` when
    /// every slot is busy. Handy for non-essential paths (e.g. health
    /// probes) that prefer to fail fast over queueing.
    pub fn try_acquire(&self) -> Option<OwnedSemaphorePermit> {
        self.sem.clone().try_acquire_owned().ok()
    }

    /// Acquire with an upper bound on wait time. Returns `None` on
    /// timeout. The wait time is observed at acquisition; the caller
    /// keeps the permit for as long as it likes once granted.
    pub async fn acquire_with_timeout(&self, max_wait: Duration) -> Option<OwnedSemaphorePermit> {
        match tokio::time::timeout(max_wait, self.acquire()).await {
            Ok(permit) => Some(permit),
            Err(_) => {
                tracing::warn!(
                    max_wait_ms = max_wait.as_millis() as u64,
                    permits_total = self.permits,
                    "inference admission acquire timed out"
                );
                None
            }
        }
    }

    /// Total permits configured. Useful for status surfaces (`car-host`
    /// tray, `car daemon status`) so operators can see the cap.
    pub fn permits(&self) -> usize {
        self.permits
    }

    /// Permits currently free. Snapshot — racy by definition but
    /// sufficient for status panels.
    pub fn permits_available(&self) -> usize {
        self.sem.available_permits()
    }
}

impl Default for InferenceAdmission {
    fn default() -> Self {
        Self::new()
    }
}

fn chosen_permit_count() -> usize {
    // Operator override wins.
    if let Ok(raw) = std::env::var(ENV_MAX_CONCURRENT) {
        if let Ok(n) = raw.trim().parse::<usize>() {
            if n >= 1 {
                return n;
            }
        }
        tracing::warn!(
            value = %raw,
            "{} must be a positive integer; ignoring and falling back to auto-sizing",
            ENV_MAX_CONCURRENT
        );
    }

    // Auto-size from host RAM. ~8 GB per concurrent generation is a
    // conservative floor for the model-size mix CAR users hit in
    // practice (Qwen3-4B + a remote model + vLLM-MLX VL); raise the
    // env var when running with smaller models or more headroom.
    let total_ram_mb = host_ram_mb();
    let auto = (total_ram_mb / 8192).max(1).min(8) as usize;
    auto
}

fn host_ram_mb() -> u64 {
    #[cfg(target_os = "macos")]
    {
        if let Ok(output) = std::process::Command::new("sysctl")
            .args(["-n", "hw.memsize"])
            .output()
        {
            if output.status.success() {
                if let Ok(s) = String::from_utf8(output.stdout) {
                    if let Ok(bytes) = s.trim().parse::<u64>() {
                        return bytes / (1024 * 1024);
                    }
                }
            }
        }
    }
    #[cfg(target_os = "linux")]
    {
        if let Ok(content) = std::fs::read_to_string("/proc/meminfo") {
            for line in content.lines() {
                if let Some(rest) = line.strip_prefix("MemTotal:") {
                    let parts: Vec<&str> = rest.split_whitespace().collect();
                    if let Some(kb_str) = parts.first() {
                        if let Ok(kb) = kb_str.parse::<u64>() {
                            return kb / 1024;
                        }
                    }
                }
            }
        }
    }
    // Final fallback: assume 16 GB so auto-sizing yields 2 permits —
    // a sane default for the laptop demographic.
    16 * 1024
}

#[cfg(test)]
mod tests {
    // Tests use `with_permits` exclusively to stay deterministic —
    // `new()` reads `CAR_INFERENCE_MAX_CONCURRENT` from a
    // process-global env, which would race when cargo test runs
    // sibling tests in parallel.
    use super::*;

    #[tokio::test]
    async fn permits_clamps_to_at_least_one() {
        let admission = InferenceAdmission::with_permits(0);
        assert_eq!(admission.permits(), 1);
    }

    #[tokio::test]
    async fn try_acquire_returns_none_when_full() {
        let admission = InferenceAdmission::with_permits(1);
        let _held = admission.acquire().await;
        assert!(admission.try_acquire().is_none());
    }

    #[tokio::test]
    async fn acquire_with_timeout_returns_none_on_full_queue() {
        let admission = InferenceAdmission::with_permits(1);
        let _held = admission.acquire().await;
        let started = std::time::Instant::now();
        let result = admission
            .acquire_with_timeout(Duration::from_millis(50))
            .await;
        assert!(result.is_none());
        assert!(started.elapsed() >= Duration::from_millis(45));
    }

    #[tokio::test]
    async fn permits_available_reflects_outstanding_holds() {
        let admission = InferenceAdmission::with_permits(2);
        assert_eq!(admission.permits_available(), 2);
        let _a = admission.acquire().await;
        assert_eq!(admission.permits_available(), 1);
        let _b = admission.acquire().await;
        assert_eq!(admission.permits_available(), 0);
    }

    #[test]
    fn host_ram_mb_returns_a_positive_value() {
        // Even on the fallback path the function must report >0 so
        // chosen_permit_count produces a usable cap.
        assert!(host_ram_mb() > 0);
    }
}