use std::collections::HashMap;
use std::net::TcpListener;
use std::path::Path;
use std::process::Stdio;
use std::time::{Duration, Instant};
use tokio::process::{Child, Command};
use tokio::sync::Mutex;
use tracing::{info, warn};
use crate::vllm_runtime;
const HEALTH_TIMEOUT: Duration = Duration::from_secs(3);
const READY_DEADLINE: Duration = Duration::from_secs(120);
const READY_POLL: Duration = Duration::from_millis(500);
struct ManagedServer {
child: Child,
port: u16,
last_used: Instant,
}
impl ManagedServer {
fn endpoint(&self) -> String {
format!("http://127.0.0.1:{}", self.port)
}
fn is_alive(&mut self) -> bool {
matches!(self.child.try_wait(), Ok(None))
}
}
pub struct VllmServerPool {
servers: Mutex<HashMap<String, ManagedServer>>,
idle_ttl: Duration,
}
impl VllmServerPool {
pub fn new(idle_ttl: Duration) -> Self {
Self {
servers: Mutex::new(HashMap::new()),
idle_ttl,
}
}
pub async fn ensure(&self, model_id: &str, runtime_model: &str) -> Result<String, String> {
let mut servers = self.servers.lock().await;
if let Some(s) = servers.get_mut(model_id) {
if s.is_alive() {
s.last_used = Instant::now();
return Ok(s.endpoint());
}
warn!(model = model_id, "supervised vllm-mlx server died; respawning");
servers.remove(model_id);
}
let runtime = vllm_runtime::ensure_runtime()
.await
.map_err(|e| format!("vllm-mlx runtime unavailable: {e}"))?;
let port = alloc_loopback_port()?;
let endpoint = format!("http://127.0.0.1:{port}");
info!(
model = model_id,
runtime_model, port, "starting supervised vllm-mlx server"
);
let child = spawn_server(&runtime.server, runtime_model, port)
.map_err(|e| format!("failed to spawn vllm-mlx serve: {e}"))?;
let mut server = ManagedServer {
child,
port,
last_used: Instant::now(),
};
if let Err(e) = wait_ready(&endpoint, &mut server).await {
return Err(e);
}
info!(model = model_id, endpoint = %endpoint, "vllm-mlx server ready");
servers.insert(model_id.to_string(), server);
Ok(endpoint)
}
pub async fn evict_idle(&self) -> usize {
let mut servers = self.servers.lock().await;
let now = Instant::now();
let ttl = self.idle_ttl;
let mut stale: Vec<String> = Vec::new();
for (k, s) in servers.iter_mut() {
if now.duration_since(s.last_used) > ttl || !s.is_alive() {
stale.push(k.clone());
}
}
for k in &stale {
info!(model = %k, "evicting idle vllm-mlx server");
servers.remove(k); }
stale.len()
}
pub async fn len(&self) -> usize {
self.servers.lock().await.len()
}
}
fn alloc_loopback_port() -> Result<u16, String> {
let listener = TcpListener::bind("127.0.0.1:0")
.map_err(|e| format!("could not allocate a local port: {e}"))?;
listener
.local_addr()
.map(|a| a.port())
.map_err(|e| format!("could not read allocated port: {e}"))
}
fn spawn_server(server_bin: &Path, runtime_model: &str, port: u16) -> std::io::Result<Child> {
let (out, err) = log_sinks(port);
Command::new(server_bin)
.arg("serve")
.arg(runtime_model)
.arg("--port")
.arg(port.to_string())
.stdin(Stdio::null())
.stdout(out)
.stderr(err)
.kill_on_drop(true)
.spawn()
}
fn log_sinks(port: u16) -> (Stdio, Stdio) {
if let Some(home) = std::env::var_os("HOME") {
let dir = Path::new(&home).join(".car").join("logs");
if std::fs::create_dir_all(&dir).is_ok() {
let open = |suffix: &str| {
std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(dir.join(format!("vllm-mlx-{port}.{suffix}.log")))
.ok()
.map(Stdio::from)
};
if let (Some(o), Some(e)) = (open("stdout"), open("stderr")) {
return (o, e);
}
}
}
(Stdio::null(), Stdio::null())
}
async fn wait_ready(endpoint: &str, server: &mut ManagedServer) -> Result<(), String> {
let start = Instant::now();
loop {
if vllm_runtime::health_ok(endpoint, HEALTH_TIMEOUT).await {
return Ok(());
}
if !server.is_alive() {
return Err(format!(
"vllm-mlx server exited during startup (see ~/.car/logs/vllm-mlx-{}.stderr.log)",
server.port
));
}
if start.elapsed() > READY_DEADLINE {
return Err(format!(
"vllm-mlx server did not become healthy at {endpoint} within {}s",
READY_DEADLINE.as_secs()
));
}
tokio::time::sleep(READY_POLL).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn alloc_port_returns_distinct_usable_ports() {
let a = alloc_loopback_port().unwrap();
let b = alloc_loopback_port().unwrap();
assert_ne!(a, 0);
assert_ne!(b, 0);
assert!(TcpListener::bind(("127.0.0.1", a)).is_ok());
}
#[tokio::test]
async fn evict_idle_on_empty_pool_is_zero() {
let pool = VllmServerPool::new(Duration::from_secs(300));
assert_eq!(pool.evict_idle().await, 0);
assert_eq!(pool.len().await, 0);
}
#[tokio::test]
async fn spawns_and_health_waits_a_stand_in_server() {
let Some(python) = vllm_runtime::which("python3") else {
eprintln!("SKIP: python3 not available");
return;
};
let dir = std::env::temp_dir().join(format!("car-vllm-pool-test-{}", std::process::id()));
std::fs::create_dir_all(&dir).unwrap();
let script = dir.join("fake-vllm-mlx");
std::fs::write(
&script,
format!(
"#!{}\n\
import sys, http.server\n\
port = int(sys.argv[sys.argv.index('--port') + 1])\n\
class H(http.server.BaseHTTPRequestHandler):\n\
\x20 def do_GET(self):\n\
\x20 self.send_response(200); self.end_headers(); self.wfile.write(b'ok')\n\
\x20 def log_message(self, *a): pass\n\
http.server.HTTPServer(('127.0.0.1', port), H).serve_forever()\n",
python.display()
),
)
.unwrap();
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&script, std::fs::Permissions::from_mode(0o755)).unwrap();
}
let port = alloc_loopback_port().unwrap();
let child = spawn_server(&script, "dummy/model", port).expect("spawn");
let mut server = ManagedServer {
child,
port,
last_used: Instant::now(),
};
let endpoint = server.endpoint();
wait_ready(&endpoint, &mut server)
.await
.expect("stand-in server should become healthy");
assert!(vllm_runtime::health_ok(&endpoint, Duration::from_secs(2)).await);
assert!(server.is_alive());
drop(server);
let _ = std::fs::remove_dir_all(&dir);
}
}