car-inference 0.29.0

Local model inference for CAR — Candle backend with Qwen3 models
//! On-demand pool of supervised `vllm-mlx` server processes.
//!
//! The engine manages these exactly like its other backends: lazy-started on the
//! first request for a `vllm-mlx/*` model, health-waited before routing, and
//! idle-evicted by the same loop that reaps in-process backends. The point is
//! that a server-backed (multimodal / unsupported-arch) model is
//! indistinguishable from an in-process one to the caller — no manual server to
//! start, no endpoint to configure.
//!
//! `vllm-mlx serve <model> --port <P>` serves **one** model per process, so the
//! pool keys a process per model id, each on a CAR-allocated free loopback port.
//! Teardown relies on `kill_on_drop`: removing a [`ManagedServer`] from the map
//! SIGKILLs its child.

use std::collections::HashMap;
use std::net::TcpListener;
use std::path::Path;
use std::process::Stdio;
use std::time::{Duration, Instant};

use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tracing::{info, warn};

use crate::vllm_runtime;

/// Per-`/health` probe timeout during the readiness wait.
const HEALTH_TIMEOUT: Duration = Duration::from_secs(3);
/// How long to wait for a freshly-spawned server to become healthy. A cold model
/// load (weights off disk into the GPU) can take tens of seconds.
const READY_DEADLINE: Duration = Duration::from_secs(120);
/// Poll cadence while waiting for readiness.
const READY_POLL: Duration = Duration::from_millis(500);

struct ManagedServer {
    /// The `vllm-mlx serve` child. `kill_on_drop` is set, so dropping this struct
    /// terminates the server.
    child: Child,
    port: u16,
    last_used: Instant,
}

impl ManagedServer {
    fn endpoint(&self) -> String {
        format!("http://127.0.0.1:{}", self.port)
    }

    /// True while the child is still running (not yet reaped).
    fn is_alive(&mut self) -> bool {
        matches!(self.child.try_wait(), Ok(None))
    }
}

/// On-demand pool of supervised `vllm-mlx` servers, one per model id.
pub struct VllmServerPool {
    servers: Mutex<HashMap<String, ManagedServer>>,
    idle_ttl: Duration,
}

impl VllmServerPool {
    pub fn new(idle_ttl: Duration) -> Self {
        Self {
            servers: Mutex::new(HashMap::new()),
            idle_ttl,
        }
    }

    /// Ensure a healthy server for `model_id` (whose backing HF model is
    /// `runtime_model`) and return its loopback endpoint. Starts the process and
    /// health-waits on first use; subsequent calls bump the idle timer and return
    /// the cached endpoint. A crashed server is respawned transparently.
    ///
    /// NB: a single lock serializes starts across models — multimodal traffic is
    /// occasional, so the simplicity is worth more than concurrent cold-starts.
    pub async fn ensure(&self, model_id: &str, runtime_model: &str) -> Result<String, String> {
        let mut servers = self.servers.lock().await;

        if let Some(s) = servers.get_mut(model_id) {
            if s.is_alive() {
                s.last_used = Instant::now();
                return Ok(s.endpoint());
            }
            warn!(model = model_id, "supervised vllm-mlx server died; respawning");
            servers.remove(model_id);
        }

        let runtime = vllm_runtime::ensure_runtime()
            .await
            .map_err(|e| format!("vllm-mlx runtime unavailable: {e}"))?;
        let port = alloc_loopback_port()?;
        let endpoint = format!("http://127.0.0.1:{port}");

        info!(
            model = model_id,
            runtime_model, port, "starting supervised vllm-mlx server"
        );
        let child = spawn_server(&runtime.server, runtime_model, port)
            .map_err(|e| format!("failed to spawn vllm-mlx serve: {e}"))?;
        let mut server = ManagedServer {
            child,
            port,
            last_used: Instant::now(),
        };

        if let Err(e) = wait_ready(&endpoint, &mut server).await {
            // Dropping `server` kills the child via kill_on_drop.
            return Err(e);
        }

        info!(model = model_id, endpoint = %endpoint, "vllm-mlx server ready");
        servers.insert(model_id.to_string(), server);
        Ok(endpoint)
    }

    /// Stop servers idle longer than the TTL (and reap any that have died).
    /// Returns the number stopped. Called from the engine's idle-eviction loop.
    pub async fn evict_idle(&self) -> usize {
        let mut servers = self.servers.lock().await;
        let now = Instant::now();
        let ttl = self.idle_ttl;
        let mut stale: Vec<String> = Vec::new();
        for (k, s) in servers.iter_mut() {
            if now.duration_since(s.last_used) > ttl || !s.is_alive() {
                stale.push(k.clone());
            }
        }
        for k in &stale {
            info!(model = %k, "evicting idle vllm-mlx server");
            servers.remove(k); // Drop → kill_on_drop SIGKILLs the child.
        }
        stale.len()
    }

    /// Number of currently-managed servers (for status/telemetry).
    pub async fn len(&self) -> usize {
        self.servers.lock().await.len()
    }
}

/// Reserve a free loopback TCP port by binding `:0` and releasing it. There is an
/// unavoidable TOCTOU window before the server binds, but the OS won't hand the
/// same ephemeral port to two binds in quick succession in practice.
fn alloc_loopback_port() -> Result<u16, String> {
    let listener = TcpListener::bind("127.0.0.1:0")
        .map_err(|e| format!("could not allocate a local port: {e}"))?;
    listener
        .local_addr()
        .map(|a| a.port())
        .map_err(|e| format!("could not read allocated port: {e}"))
}

/// Spawn `vllm-mlx serve <runtime_model> --port <port>`. stdout/stderr go to log
/// files under `~/.car/logs` when resolvable, else are discarded.
fn spawn_server(server_bin: &Path, runtime_model: &str, port: u16) -> std::io::Result<Child> {
    let (out, err) = log_sinks(port);
    Command::new(server_bin)
        .arg("serve")
        .arg(runtime_model)
        .arg("--port")
        .arg(port.to_string())
        .stdin(Stdio::null())
        .stdout(out)
        .stderr(err)
        .kill_on_drop(true)
        .spawn()
}

/// stdout/stderr sinks for a server, keyed by port so concurrent servers don't
/// clobber each other's logs. Falls back to `/dev/null` if `~/.car/logs` can't be
/// created.
fn log_sinks(port: u16) -> (Stdio, Stdio) {
    if let Some(home) = std::env::var_os("HOME") {
        let dir = Path::new(&home).join(".car").join("logs");
        if std::fs::create_dir_all(&dir).is_ok() {
            let open = |suffix: &str| {
                std::fs::OpenOptions::new()
                    .create(true)
                    .append(true)
                    .open(dir.join(format!("vllm-mlx-{port}.{suffix}.log")))
                    .ok()
                    .map(Stdio::from)
            };
            if let (Some(o), Some(e)) = (open("stdout"), open("stderr")) {
                return (o, e);
            }
        }
    }
    (Stdio::null(), Stdio::null())
}

/// Poll `/health` until the server is ready or the deadline elapses. Aborts early
/// (with a precise error) if the child exits during startup.
async fn wait_ready(endpoint: &str, server: &mut ManagedServer) -> Result<(), String> {
    let start = Instant::now();
    loop {
        if vllm_runtime::health_ok(endpoint, HEALTH_TIMEOUT).await {
            return Ok(());
        }
        if !server.is_alive() {
            return Err(format!(
                "vllm-mlx server exited during startup (see ~/.car/logs/vllm-mlx-{}.stderr.log)",
                server.port
            ));
        }
        if start.elapsed() > READY_DEADLINE {
            return Err(format!(
                "vllm-mlx server did not become healthy at {endpoint} within {}s",
                READY_DEADLINE.as_secs()
            ));
        }
        tokio::time::sleep(READY_POLL).await;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn alloc_port_returns_distinct_usable_ports() {
        let a = alloc_loopback_port().unwrap();
        let b = alloc_loopback_port().unwrap();
        assert_ne!(a, 0);
        assert_ne!(b, 0);
        // Re-binding the freed port must succeed (it was released).
        assert!(TcpListener::bind(("127.0.0.1", a)).is_ok());
    }

    #[tokio::test]
    async fn evict_idle_on_empty_pool_is_zero() {
        let pool = VllmServerPool::new(Duration::from_secs(300));
        assert_eq!(pool.evict_idle().await, 0);
        assert_eq!(pool.len().await, 0);
    }

    /// End-to-end mechanics — spawn a stand-in `serve <model> --port <P>` process
    /// that answers `/health`, then prove the pool's spawn + readiness-wait reach
    /// "healthy" and that `kill_on_drop` tears it down. Uses a tiny Python HTTP
    /// server so it doesn't depend on the real vllm-mlx runtime or any weights.
    #[tokio::test]
    async fn spawns_and_health_waits_a_stand_in_server() {
        let Some(python) = vllm_runtime::which("python3") else {
            eprintln!("SKIP: python3 not available");
            return;
        };
        // A fake `vllm-mlx`: `serve <model> --port <P>` → HTTP 200 on every GET.
        let dir = std::env::temp_dir().join(format!("car-vllm-pool-test-{}", std::process::id()));
        std::fs::create_dir_all(&dir).unwrap();
        let script = dir.join("fake-vllm-mlx");
        std::fs::write(
            &script,
            format!(
                "#!{}\n\
                 import sys, http.server\n\
                 port = int(sys.argv[sys.argv.index('--port') + 1])\n\
                 class H(http.server.BaseHTTPRequestHandler):\n\
                 \x20   def do_GET(self):\n\
                 \x20       self.send_response(200); self.end_headers(); self.wfile.write(b'ok')\n\
                 \x20   def log_message(self, *a): pass\n\
                 http.server.HTTPServer(('127.0.0.1', port), H).serve_forever()\n",
                python.display()
            ),
        )
        .unwrap();
        #[cfg(unix)]
        {
            use std::os::unix::fs::PermissionsExt;
            std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
        }

        let port = alloc_loopback_port().unwrap();
        let child = spawn_server(&script, "dummy/model", port).expect("spawn");
        let mut server = ManagedServer {
            child,
            port,
            last_used: Instant::now(),
        };
        let endpoint = server.endpoint();

        wait_ready(&endpoint, &mut server)
            .await
            .expect("stand-in server should become healthy");
        assert!(vllm_runtime::health_ok(&endpoint, Duration::from_secs(2)).await);
        assert!(server.is_alive());

        // kill_on_drop tears the child down; the port frees up afterwards.
        drop(server);
        let _ = std::fs::remove_dir_all(&dir);
    }
}