use serial_test::serial;
use std::collections::HashMap;
use std::process::Stdio;
use std::sync::atomic::{AtomicU16, Ordering};
use std::time::Duration;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::{Child, Command};
static NEXT_PORT: AtomicU16 = AtomicU16::new(21000);
fn allocate_port() -> u16 {
NEXT_PORT.fetch_add(1, Ordering::SeqCst)
}
struct MockServer {
child: Child,
port: u16,
model: String,
}
impl MockServer {
async fn spawn(model: &str) -> Self {
Self::spawn_with_args(model, &[]).await
}
async fn spawn_with_args(model: &str, extra_args: &[&str]) -> Self {
let mut cmd = Command::new(env!("CARGO_BIN_EXE_mock-vllm"));
cmd.args(["--port", "0", "--model", model, "--latency-ms", "5"])
.args(extra_args)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let mut child = cmd.spawn().expect("Failed to spawn mock-vllm");
let stdout = child.stdout.take().expect("Failed to capture stdout");
let mut reader = BufReader::new(stdout).lines();
let port = tokio::time::timeout(Duration::from_secs(5), async {
while let Some(line) = reader.next_line().await.expect("Failed to read stdout") {
if let Some(port_str) = line.strip_prefix("READY ") {
return port_str.parse::<u16>().expect("Failed to parse port");
}
}
panic!("Server never signaled READY");
})
.await
.expect("Timeout waiting for server to be ready");
Self {
child,
port,
model: model.to_string(),
}
}
fn port(&self) -> u16 {
self.port
}
async fn chat(&self, message: &str) -> serde_json::Value {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/v1/chat/completions", self.port);
let body = serde_json::json!({
"model": self.model,
"messages": [{"role": "user", "content": message}]
});
client
.post(&url)
.json(&body)
.send()
.await
.expect("Request failed")
.json()
.await
.expect("Failed to parse response")
}
async fn stats(&self) -> serde_json::Value {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/stats", self.port);
client
.get(&url)
.send()
.await
.expect("Request failed")
.json()
.await
.expect("Failed to parse response")
}
async fn sleep(&self, level: u8) {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/sleep?level={}", self.port, level);
client
.post(&url)
.send()
.await
.expect("Sleep request failed");
}
async fn wake(&self) {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/wake_up", self.port);
client.post(&url).send().await.expect("Wake request failed");
}
#[allow(dead_code)]
async fn set_fail_sleep(&self, enabled: bool) {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/control/fail-sleep", self.port);
client
.post(&url)
.json(&serde_json::json!({ "enabled": enabled }))
.send()
.await
.expect("Failed to set fail-sleep");
}
async fn set_sleep_delay(&self, delay_ms: u64) {
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/control/sleep-delay", self.port);
client
.post(&url)
.json(&serde_json::json!({ "delay_ms": delay_ms }))
.send()
.await
.expect("Failed to set sleep-delay");
}
}
impl Drop for MockServer {
fn drop(&mut self) {
let _ = self.child.start_kill();
}
}
#[tokio::test]
#[serial]
async fn test_mock_server_basic() {
let server = MockServer::spawn("test-model").await;
let stats = server.stats().await;
assert_eq!(stats["model"], "test-model");
assert_eq!(stats["sleeping"], false);
assert_eq!(stats["request_count"], 0);
let response = server.chat("Hello!").await;
assert!(
response["choices"][0]["message"]["content"]
.as_str()
.unwrap()
.contains("Hello!")
);
let stats = server.stats().await;
assert_eq!(stats["request_count"], 1);
}
#[tokio::test]
#[serial]
async fn test_mock_server_sleep_wake() {
let server = MockServer::spawn("sleepy-model").await;
let stats = server.stats().await;
assert_eq!(stats["sleeping"], false);
server.sleep(1).await;
let stats = server.stats().await;
assert_eq!(stats["sleeping"], true);
assert_eq!(stats["sleep_level"], 1);
server.wake().await;
let stats = server.stats().await;
assert_eq!(stats["sleeping"], false);
let response = server.chat("Hello again!").await;
assert!(response.get("choices").is_some());
}
#[tokio::test]
#[serial]
async fn test_mock_server_l2_sleep() {
let server = MockServer::spawn("deep-model").await;
server.sleep(2).await;
let stats = server.stats().await;
assert_eq!(stats["sleeping"], true);
assert_eq!(stats["sleep_level"], 2);
server.wake().await;
let stats = server.stats().await;
assert_eq!(stats["sleeping"], false);
}
#[tokio::test]
#[serial]
async fn test_mock_server_rejects_while_sleeping() {
let server = MockServer::spawn("strict-model").await;
server.sleep(1).await;
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/v1/chat/completions", server.port());
let body = serde_json::json!({
"model": "strict-model",
"messages": [{"role": "user", "content": "test"}]
});
let response = client.post(&url).json(&body).send().await.unwrap();
assert_eq!(response.status(), reqwest::StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
#[serial]
async fn test_orchestrator_spawns_and_manages_process() {
use llmux::{ModelConfig, Orchestrator, ProcessState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let mut models = HashMap::new();
models.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port: 0, extra_args: vec![],
sleep_level: 1,
},
);
let port = allocate_port();
models.get_mut("test-model").unwrap().port = port;
let orchestrator = Arc::new(Orchestrator::with_command(
models,
mock_vllm_path.to_string(),
));
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::NotStarted)
);
let result = tokio::time::timeout(
Duration::from_secs(10),
orchestrator.ensure_running("test-model"),
)
.await;
assert!(result.is_ok(), "Timed out waiting for process to start");
assert!(result.unwrap().is_ok(), "Failed to start process");
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
let client = reqwest::Client::new();
let url = format!("http://localhost:{}/v1/chat/completions", port);
let body = serde_json::json!({
"model": "test-model",
"messages": [{"role": "user", "content": "test"}]
});
let response = client.post(&url).json(&body).send().await.unwrap();
assert!(response.status().is_success());
orchestrator
.sleep_model("test-model", llmux::SleepLevel::L1)
.await
.unwrap();
assert!(matches!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: Some(_) })
));
orchestrator.wake_model("test-model").await.unwrap();
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
}
#[tokio::test]
#[serial]
async fn test_orchestrator_multiple_models() {
use llmux::{ModelConfig, Orchestrator, ProcessState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port_alpha = allocate_port();
let port_beta = allocate_port();
let mut models = HashMap::new();
models.insert(
"model-alpha".to_string(),
ModelConfig {
model_path: "model-alpha".to_string(),
port: port_alpha,
extra_args: vec![],
sleep_level: 1,
},
);
models.insert(
"model-beta".to_string(),
ModelConfig {
model_path: "model-beta".to_string(),
port: port_beta,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
models,
mock_vllm_path.to_string(),
));
assert_eq!(
orchestrator.process_state("model-alpha").await,
Some(ProcessState::NotStarted)
);
assert_eq!(
orchestrator.process_state("model-beta").await,
Some(ProcessState::NotStarted)
);
orchestrator
.ensure_running("model-alpha")
.await
.expect("Failed to start model-alpha");
assert_eq!(
orchestrator.process_state("model-alpha").await,
Some(ProcessState::Running { sleeping: None })
);
assert_eq!(
orchestrator.process_state("model-beta").await,
Some(ProcessState::NotStarted)
);
orchestrator
.ensure_running("model-beta")
.await
.expect("Failed to start model-beta");
assert_eq!(
orchestrator.process_state("model-alpha").await,
Some(ProcessState::Running { sleeping: None })
);
assert_eq!(
orchestrator.process_state("model-beta").await,
Some(ProcessState::Running { sleeping: None })
);
orchestrator
.sleep_model("model-alpha", llmux::SleepLevel::L1)
.await
.unwrap();
assert!(matches!(
orchestrator.process_state("model-alpha").await,
Some(ProcessState::Running { sleeping: Some(_) })
));
assert_eq!(
orchestrator.process_state("model-beta").await,
Some(ProcessState::Running { sleeping: None })
);
orchestrator.wake_model("model-alpha").await.unwrap();
assert_eq!(
orchestrator.process_state("model-alpha").await,
Some(ProcessState::Running { sleeping: None })
);
}
#[tokio::test]
async fn test_switcher_basic_registration() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator};
use std::sync::Arc;
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8001,
extra_args: vec![],
sleep_level: 1,
},
);
configs.insert(
"model-b".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8002,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::new(configs));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
assert!(switcher.is_registered("model-a"));
assert!(switcher.is_registered("model-b"));
assert!(!switcher.is_registered("model-c"));
}
#[tokio::test]
async fn test_switcher_unregistered_model_error() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitchError};
use std::sync::Arc;
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8001,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::new(configs));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
let result = switcher.ensure_model_ready("nonexistent").await;
assert!(matches!(result, Err(SwitchError::ModelNotFound(_))));
}
#[tokio::test]
async fn test_switcher_in_flight_tracking() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator};
use std::sync::Arc;
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8001,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::new(configs));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
assert_eq!(switcher.in_flight_count("model-a"), 0);
let guard1 = switcher.acquire_in_flight("model-a");
assert!(guard1.is_some());
assert_eq!(switcher.in_flight_count("model-a"), 1);
let guard2 = switcher.acquire_in_flight("model-a");
assert!(guard2.is_some());
assert_eq!(switcher.in_flight_count("model-a"), 2);
drop(guard1);
assert_eq!(switcher.in_flight_count("model-a"), 1);
drop(guard2);
assert_eq!(switcher.in_flight_count("model-a"), 0);
assert!(switcher.acquire_in_flight("nonexistent").is_none());
}
#[tokio::test]
async fn test_switcher_initial_state() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitcherState};
use std::sync::Arc;
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8001,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::new(configs));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
assert_eq!(switcher.state().await, SwitcherState::Idle);
assert_eq!(switcher.active_model().await, None);
}
#[tokio::test]
#[serial]
async fn test_switcher_ensure_model_ready() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitcherState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port = allocate_port();
let mut configs = HashMap::new();
configs.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
configs,
mock_vllm_path.to_string(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
assert_eq!(switcher.state().await, SwitcherState::Idle);
let result = tokio::time::timeout(
Duration::from_secs(10),
switcher.ensure_model_ready("test-model"),
)
.await;
assert!(result.is_ok(), "Timeout");
assert!(result.unwrap().is_ok(), "Failed to ensure model ready");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "test-model".to_string()
}
);
assert_eq!(
switcher.active_model().await,
Some("test-model".to_string())
);
}
#[tokio::test]
#[serial]
async fn test_switcher_model_switching() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitcherState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port_a = allocate_port();
let port_b = allocate_port();
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "model-a".to_string(),
port: port_a,
extra_args: vec![],
sleep_level: 1,
},
);
configs.insert(
"model-b".to_string(),
ModelConfig {
model_path: "model-b".to_string(),
port: port_b,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
configs,
mock_vllm_path.to_string(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to start model-a");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-a".to_string()
}
);
switcher
.ensure_model_ready("model-b")
.await
.expect("Failed to switch to model-b");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-b".to_string()
}
);
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to switch back to model-a");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-a".to_string()
}
);
}
#[tokio::test]
#[serial]
async fn test_switcher_same_model_no_switch() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitcherState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port = allocate_port();
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "model-a".to_string(),
port,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
configs,
mock_vllm_path.to_string(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator, policy);
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to start model-a");
let start = std::time::Instant::now();
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed second request");
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"Same model request took too long: {:?}",
elapsed
);
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-a".to_string()
}
);
}
#[tokio::test]
async fn test_orchestrator_unknown_model() {
use llmux::{ModelConfig, Orchestrator, OrchestratorError};
use std::sync::Arc;
let mut configs = HashMap::new();
configs.insert(
"known-model".to_string(),
ModelConfig {
model_path: "test".to_string(),
port: 8001,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::new(configs));
assert_eq!(orchestrator.process_state("unknown").await, None);
let result = orchestrator.ensure_running("unknown").await;
assert!(matches!(result, Err(OrchestratorError::ModelNotFound(_))));
let result = orchestrator
.sleep_model("unknown", llmux::SleepLevel::L1)
.await;
assert!(matches!(result, Err(OrchestratorError::ModelNotFound(_))));
let result = orchestrator.wake_model("unknown").await;
assert!(matches!(result, Err(OrchestratorError::ModelNotFound(_))));
}
#[tokio::test]
#[serial]
async fn test_end_to_end_single_model() {
use axum::Router;
use llmux::{
Config, FifoPolicy, ModelConfig, ModelSwitcher, ModelSwitcherLayer, Orchestrator,
PolicyConfig,
};
use std::sync::Arc;
use tokio::net::TcpListener;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let backend_port = allocate_port();
let proxy_port = allocate_port();
let mut models = HashMap::new();
models.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port: backend_port,
extra_args: vec![],
sleep_level: 1,
},
);
let config = Config {
models: models.clone(),
policy: PolicyConfig::default(),
port: proxy_port,
metrics_port: 0,
vllm_command: mock_vllm_path.to_string(),
};
let orchestrator = Arc::new(Orchestrator::with_command(
config.models.clone(),
config.vllm_command.clone(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator.clone(), policy);
let targets = config.build_onwards_targets().unwrap();
let onwards_state = onwards::AppState::new(targets);
let onwards_router = onwards::build_router(onwards_state);
let app: Router = onwards_router.layer(ModelSwitcherLayer::new(switcher));
let listener = TcpListener::bind(format!("127.0.0.1:{}", proxy_port))
.await
.unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let response = client
.post(format!(
"http://127.0.0.1:{}/v1/chat/completions",
proxy_port
))
.json(&serde_json::json!({
"model": "test-model",
"messages": [{"role": "user", "content": "Hello from e2e test!"}]
}))
.timeout(Duration::from_secs(15))
.send()
.await
.expect("Request failed");
assert!(
response.status().is_success(),
"Response status: {}",
response.status()
);
let body: serde_json::Value = response.json().await.unwrap();
let content = body["choices"][0]["message"]["content"].as_str().unwrap();
assert!(
content.contains("Hello from e2e test!"),
"Unexpected response: {}",
content
);
server.abort();
}
#[tokio::test]
#[serial]
async fn test_end_to_end_model_switching() {
use axum::Router;
use llmux::{
Config, FifoPolicy, ModelConfig, ModelSwitcher, ModelSwitcherLayer, Orchestrator,
PolicyConfig,
};
use std::sync::Arc;
use tokio::net::TcpListener;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port_a = allocate_port();
let port_b = allocate_port();
let proxy_port = allocate_port();
let mut models = HashMap::new();
models.insert(
"model-a".to_string(),
ModelConfig {
model_path: "model-a".to_string(),
port: port_a,
extra_args: vec![],
sleep_level: 1,
},
);
models.insert(
"model-b".to_string(),
ModelConfig {
model_path: "model-b".to_string(),
port: port_b,
extra_args: vec![],
sleep_level: 1,
},
);
let config = Config {
models: models.clone(),
policy: PolicyConfig::default(),
port: proxy_port,
metrics_port: 0,
vllm_command: mock_vllm_path.to_string(),
};
let orchestrator = Arc::new(Orchestrator::with_command(
config.models.clone(),
config.vllm_command.clone(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator.clone(), policy);
let targets = config.build_onwards_targets().unwrap();
let onwards_state = onwards::AppState::new(targets);
let onwards_router = onwards::build_router(onwards_state);
let app: Router = onwards_router.layer(ModelSwitcherLayer::new(switcher));
let listener = TcpListener::bind(format!("127.0.0.1:{}", proxy_port))
.await
.unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let url = format!("http://127.0.0.1:{}/v1/chat/completions", proxy_port);
let response = client
.post(&url)
.json(&serde_json::json!({
"model": "model-a",
"messages": [{"role": "user", "content": "Hello A!"}]
}))
.timeout(Duration::from_secs(15))
.send()
.await
.expect("Request to model-a failed");
assert!(response.status().is_success());
let body: serde_json::Value = response.json().await.unwrap();
assert!(
body["choices"][0]["message"]["content"]
.as_str()
.unwrap()
.contains("Hello A!")
);
let response = client
.post(&url)
.json(&serde_json::json!({
"model": "model-b",
"messages": [{"role": "user", "content": "Hello B!"}]
}))
.timeout(Duration::from_secs(15))
.send()
.await
.expect("Request to model-b failed");
assert!(response.status().is_success());
let body: serde_json::Value = response.json().await.unwrap();
assert!(
body["choices"][0]["message"]["content"]
.as_str()
.unwrap()
.contains("Hello B!")
);
let response = client
.post(&url)
.json(&serde_json::json!({
"model": "model-a",
"messages": [{"role": "user", "content": "Back to A!"}]
}))
.timeout(Duration::from_secs(15))
.send()
.await
.expect("Request back to model-a failed");
assert!(response.status().is_success());
let body: serde_json::Value = response.json().await.unwrap();
assert!(
body["choices"][0]["message"]["content"]
.as_str()
.unwrap()
.contains("Back to A!")
);
server.abort();
}
#[tokio::test]
#[serial]
async fn test_end_to_end_unknown_model_passthrough() {
use axum::Router;
use llmux::{
Config, FifoPolicy, ModelConfig, ModelSwitcher, ModelSwitcherLayer, Orchestrator,
PolicyConfig,
};
use std::sync::Arc;
use tokio::net::TcpListener;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let backend_port = allocate_port();
let proxy_port = allocate_port();
let mut models = HashMap::new();
models.insert(
"known-model".to_string(),
ModelConfig {
model_path: "known-model".to_string(),
port: backend_port,
extra_args: vec![],
sleep_level: 1,
},
);
let config = Config {
models: models.clone(),
policy: PolicyConfig::default(),
port: proxy_port,
metrics_port: 0,
vllm_command: mock_vllm_path.to_string(),
};
let orchestrator = Arc::new(Orchestrator::with_command(
config.models.clone(),
config.vllm_command.clone(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator.clone(), policy);
let targets = config.build_onwards_targets().unwrap();
let onwards_state = onwards::AppState::new(targets);
let onwards_router = onwards::build_router(onwards_state);
let app: Router = onwards_router.layer(ModelSwitcherLayer::new(switcher));
let listener = TcpListener::bind(format!("127.0.0.1:{}", proxy_port))
.await
.unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let response = client
.post(format!(
"http://127.0.0.1:{}/v1/chat/completions",
proxy_port
))
.json(&serde_json::json!({
"model": "unknown-model",
"messages": [{"role": "user", "content": "test"}]
}))
.timeout(Duration::from_secs(5))
.send()
.await
.expect("Request failed");
assert_eq!(response.status(), reqwest::StatusCode::NOT_FOUND);
server.abort();
}
#[tokio::test]
#[serial]
async fn test_l3_fallback_on_sleep_failure() {
use llmux::{
FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, ProcessState, SwitcherState,
};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port_a = allocate_port();
let port_b = allocate_port();
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "model-a".to_string(),
port: port_a,
extra_args: vec![],
sleep_level: 1,
},
);
configs.insert(
"model-b".to_string(),
ModelConfig {
model_path: "model-b".to_string(),
port: port_b,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
configs,
mock_vllm_path.to_string(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator.clone(), policy);
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to start model-a");
assert_eq!(
orchestrator.process_state("model-a").await,
Some(ProcessState::Running { sleeping: None })
);
let client = reqwest::Client::new();
client
.post(format!("http://localhost:{}/control/fail-sleep", port_a))
.json(&serde_json::json!({ "enabled": true }))
.send()
.await
.expect("Failed to set fail-sleep");
switcher
.ensure_model_ready("model-b")
.await
.expect("Failed to switch to model-b");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-b".to_string()
}
);
assert_eq!(
orchestrator.process_state("model-a").await,
Some(ProcessState::NotStarted)
);
let response = client
.post(format!("http://localhost:{}/v1/chat/completions", port_b))
.json(&serde_json::json!({
"model": "model-b",
"messages": [{"role": "user", "content": "test after fallback"}]
}))
.send()
.await
.expect("Request to model-b failed");
assert!(response.status().is_success());
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to switch back to model-a");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-a".to_string()
}
);
assert_eq!(
orchestrator.process_state("model-a").await,
Some(ProcessState::Running { sleeping: None })
);
}
#[tokio::test]
#[serial]
async fn test_sleep_timeout_completes() {
let server = MockServer::spawn("timeout-model").await;
server.set_sleep_delay(2000).await;
let start = std::time::Instant::now();
server.sleep(1).await;
let elapsed = start.elapsed();
assert!(
elapsed >= Duration::from_secs(2),
"Sleep completed too quickly ({:?}), delay not applied",
elapsed
);
assert!(
elapsed < Duration::from_secs(10),
"Sleep took too long ({:?})",
elapsed
);
let stats = server.stats().await;
assert_eq!(stats["sleeping"], true);
server.wake().await;
let response = server.chat("After delayed sleep").await;
assert!(response.get("choices").is_some());
}
#[tokio::test]
#[serial]
async fn test_end_to_end_concurrent_requests() {
use axum::Router;
use llmux::{
Config, FifoPolicy, ModelConfig, ModelSwitcher, ModelSwitcherLayer, Orchestrator,
PolicyConfig,
};
use std::sync::Arc;
use tokio::net::TcpListener;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let backend_port = allocate_port();
let proxy_port = allocate_port();
let mut models = HashMap::new();
models.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port: backend_port,
extra_args: vec![],
sleep_level: 1,
},
);
let config = Config {
models: models.clone(),
policy: PolicyConfig::default(),
port: proxy_port,
metrics_port: 0,
vllm_command: mock_vllm_path.to_string(),
};
let orchestrator = Arc::new(Orchestrator::with_command(
config.models.clone(),
config.vllm_command.clone(),
));
let policy = Box::new(FifoPolicy::default());
let switcher = ModelSwitcher::new(orchestrator.clone(), policy);
let targets = config.build_onwards_targets().unwrap();
let onwards_state = onwards::AppState::new(targets);
let onwards_router = onwards::build_router(onwards_state);
let app: Router = onwards_router.layer(ModelSwitcherLayer::new(switcher));
let listener = TcpListener::bind(format!("127.0.0.1:{}", proxy_port))
.await
.unwrap();
let server = tokio::spawn(async move {
axum::serve(listener, app).await.unwrap();
});
tokio::time::sleep(Duration::from_millis(50)).await;
let client = reqwest::Client::new();
let url = format!("http://127.0.0.1:{}/v1/chat/completions", proxy_port);
let mut handles = vec![];
for i in 0..5 {
let client = client.clone();
let url = url.clone();
handles.push(tokio::spawn(async move {
client
.post(&url)
.json(&serde_json::json!({
"model": "test-model",
"messages": [{"role": "user", "content": format!("Request {}", i)}]
}))
.timeout(Duration::from_secs(15))
.send()
.await
}));
}
for (i, handle) in handles.into_iter().enumerate() {
let response = handle
.await
.expect("Task panicked")
.expect("Request failed");
assert!(
response.status().is_success(),
"Request {} failed with status {}",
i,
response.status()
);
}
server.abort();
}
#[tokio::test]
#[serial]
async fn test_switch_cooldown_enforced() {
use llmux::{FifoPolicy, ModelConfig, ModelSwitcher, Orchestrator, SwitcherState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port_a = allocate_port();
let port_b = allocate_port();
let mut configs = HashMap::new();
configs.insert(
"model-a".to_string(),
ModelConfig {
model_path: "model-a".to_string(),
port: port_a,
extra_args: vec![],
sleep_level: 1,
},
);
configs.insert(
"model-b".to_string(),
ModelConfig {
model_path: "model-b".to_string(),
port: port_b,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
configs,
mock_vllm_path.to_string(),
));
let policy = Box::new(FifoPolicy::new(
1,
Duration::from_secs(60),
true,
Duration::from_secs(2),
));
let switcher = ModelSwitcher::new(orchestrator, policy);
switcher
.ensure_model_ready("model-a")
.await
.expect("Failed to start model-a");
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-a".to_string()
}
);
let start = std::time::Instant::now();
switcher
.ensure_model_ready("model-b")
.await
.expect("Failed to switch to model-b");
let elapsed = start.elapsed();
assert_eq!(
switcher.state().await,
SwitcherState::Active {
model: "model-b".to_string()
}
);
assert!(
elapsed >= Duration::from_millis(1500),
"Switch completed too quickly ({:?}), cooldown not enforced",
elapsed
);
}
#[tokio::test]
#[serial]
async fn test_zombie_process_recovery() {
use llmux::{ModelConfig, Orchestrator, ProcessState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port = allocate_port();
let mut models = HashMap::new();
models.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
models,
mock_vllm_path.to_string(),
));
orchestrator
.ensure_running("test-model")
.await
.expect("Failed to start");
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
let client = reqwest::Client::new();
let _ = client
.post(format!("http://localhost:{}/wake_up", port))
.send()
.await;
orchestrator
.sleep_model("test-model", llmux::SleepLevel::Stop)
.await
.unwrap();
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::NotStarted)
);
orchestrator
.wake_model("test-model")
.await
.expect("Failed to wake after process death");
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
let response = client
.post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&serde_json::json!({
"model": "test-model",
"messages": [{"role": "user", "content": "after recovery"}]
}))
.send()
.await
.expect("Request after recovery failed");
assert!(response.status().is_success());
}
#[tokio::test]
#[serial]
async fn test_zombie_detection_on_wake() {
use llmux::{ModelConfig, Orchestrator, ProcessState};
use std::sync::Arc;
let mock_vllm_path = env!("CARGO_BIN_EXE_mock-vllm");
let port = allocate_port();
let mut models = HashMap::new();
models.insert(
"test-model".to_string(),
ModelConfig {
model_path: "test-model".to_string(),
port,
extra_args: vec![],
sleep_level: 1,
},
);
let orchestrator = Arc::new(Orchestrator::with_command(
models,
mock_vllm_path.to_string(),
));
orchestrator
.ensure_running("test-model")
.await
.expect("Failed to start");
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
let output = std::process::Command::new("lsof")
.args(["-ti", &format!("tcp:{}", port), "-sTCP:LISTEN"])
.output()
.expect("lsof failed");
let pids = String::from_utf8_lossy(&output.stdout);
for pid_str in pids.trim().lines() {
if let Ok(pid) = pid_str.trim().parse::<i32>() {
unsafe {
libc::kill(pid, libc::SIGKILL);
}
}
}
tokio::time::sleep(Duration::from_millis(200)).await;
orchestrator
.wake_model("test-model")
.await
.expect("Failed to restart after zombie");
assert_eq!(
orchestrator.process_state("test-model").await,
Some(ProcessState::Running { sleeping: None })
);
let client = reqwest::Client::new();
let response = client
.post(format!("http://localhost:{}/v1/chat/completions", port))
.json(&serde_json::json!({
"model": "test-model",
"messages": [{"role": "user", "content": "after zombie recovery"}]
}))
.send()
.await
.expect("Request after zombie recovery failed");
assert!(response.status().is_success());
}