use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::RwLock;
use std::time::Duration;
use tokio::process::Command;
use super::applicator::ApplicatorError;
use super::trainer::TrainedModel;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ServerState {
Active,
Starting,
Releasing,
Down,
}
impl ServerState {
pub fn is_available(&self) -> bool {
matches!(self, Self::Active | Self::Starting | Self::Releasing)
}
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub enum SwitchingBehavior {
#[default]
FailImmediately,
WaitForReady {
timeout_secs: u64,
},
}
impl SwitchingBehavior {
pub fn wait() -> Self {
Self::WaitForReady { timeout_secs: 30 }
}
pub fn wait_with_timeout(timeout_secs: u64) -> Self {
Self::WaitForReady { timeout_secs }
}
}
#[derive(Debug, Clone)]
pub struct BlueGreenConfig {
pub base_model_path: PathBuf,
pub host: String,
pub blue_port: u16,
pub green_port: u16,
pub n_gpu_layers: u32,
pub ctx_size: u32,
pub parallel: u32,
pub data_dir: PathBuf,
pub server_path: String,
pub startup_timeout_secs: u64,
pub switching_behavior: SwitchingBehavior,
}
impl Default for BlueGreenConfig {
fn default() -> Self {
let data_dir = dirs::data_dir()
.unwrap_or_else(|| PathBuf::from("."))
.join("swarm-engine")
.join("blue-green");
Self {
base_model_path: PathBuf::new(),
host: "127.0.0.1".to_string(),
blue_port: 8080,
green_port: 8081,
n_gpu_layers: 99,
ctx_size: 4096,
parallel: 4,
data_dir,
server_path: "llama-server".to_string(),
startup_timeout_secs: 60,
switching_behavior: SwitchingBehavior::default(),
}
}
}
impl BlueGreenConfig {
pub fn new(base_model_path: impl Into<PathBuf>) -> Self {
Self {
base_model_path: base_model_path.into(),
..Default::default()
}
}
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = host.into();
self
}
pub fn blue_port(mut self, port: u16) -> Self {
self.blue_port = port;
self
}
pub fn green_port(mut self, port: u16) -> Self {
self.green_port = port;
self
}
pub fn n_gpu_layers(mut self, n: u32) -> Self {
self.n_gpu_layers = n;
self
}
pub fn parallel(mut self, n: u32) -> Self {
self.parallel = n;
self
}
pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
self.data_dir = path.into();
self
}
pub fn server_path(mut self, path: impl Into<String>) -> Self {
self.server_path = path.into();
self
}
pub fn switching_behavior(mut self, behavior: SwitchingBehavior) -> Self {
self.switching_behavior = behavior;
self
}
pub fn wait_during_switch(mut self) -> Self {
self.switching_behavior = SwitchingBehavior::wait();
self
}
pub fn wait_during_switch_with_timeout(mut self, timeout_secs: u64) -> Self {
self.switching_behavior = SwitchingBehavior::wait_with_timeout(timeout_secs);
self
}
fn blue_pid_file(&self) -> PathBuf {
self.data_dir.join("blue.pid")
}
fn green_pid_file(&self) -> PathBuf {
self.data_dir.join("green.pid")
}
fn blue_log_file(&self) -> PathBuf {
self.data_dir.join("blue.log")
}
fn green_log_file(&self) -> PathBuf {
self.data_dir.join("green.log")
}
}
pub struct BlueGreenManager {
config: BlueGreenConfig,
active_is_blue: AtomicBool,
state: RwLock<ServerState>,
loaded_loras: RwLock<Vec<PathBuf>>,
}
impl BlueGreenManager {
pub fn new(config: BlueGreenConfig) -> Result<Self, ApplicatorError> {
std::fs::create_dir_all(&config.data_dir)?;
Ok(Self {
config,
active_is_blue: AtomicBool::new(true),
state: RwLock::new(ServerState::Down),
loaded_loras: RwLock::new(Vec::new()),
})
}
pub fn config(&self) -> &BlueGreenConfig {
&self.config
}
pub fn state(&self) -> ServerState {
self.state.read().unwrap().clone()
}
pub fn active_endpoint(&self) -> String {
let state = self.state();
let is_blue_active = self.active_is_blue.load(Ordering::SeqCst);
let port = match state {
ServerState::Active | ServerState::Down => {
if is_blue_active {
self.config.blue_port
} else {
self.config.green_port
}
}
ServerState::Starting => {
if is_blue_active {
self.config.blue_port
} else {
self.config.green_port
}
}
ServerState::Releasing => {
if is_blue_active {
self.config.blue_port
} else {
self.config.green_port
}
}
};
format!("http://{}:{}", self.config.host, port)
}
pub fn standby_endpoint(&self) -> String {
let port = if self.active_is_blue.load(Ordering::SeqCst) {
self.config.green_port
} else {
self.config.blue_port
};
format!("http://{}:{}", self.config.host, port)
}
pub async fn start(&self, loras: &[PathBuf]) -> Result<(), ApplicatorError> {
tracing::info!(
loras = ?loras,
port = self.config.blue_port,
"Starting Blue server"
);
self.start_server(true, loras).await?;
self.active_is_blue.store(true, Ordering::SeqCst);
*self.state.write().unwrap() = ServerState::Active;
*self.loaded_loras.write().unwrap() = loras.to_vec();
Ok(())
}
pub async fn switch_with_new_lora(&self, new_lora: &Path) -> Result<(), ApplicatorError> {
let mut new_loras = self.loaded_loras.read().unwrap().clone();
if !new_loras.iter().any(|p| p == new_lora) {
new_loras.push(new_lora.to_path_buf());
}
self.switch_with_loras(&new_loras).await
}
pub async fn switch_with_loras(&self, loras: &[PathBuf]) -> Result<(), ApplicatorError> {
let is_blue_active = self.active_is_blue.load(Ordering::SeqCst);
let standby_name = if is_blue_active { "Green" } else { "Blue" };
let old_name = if is_blue_active { "Blue" } else { "Green" };
tracing::info!(
standby = standby_name,
loras = ?loras,
"Starting standby server for switch"
);
*self.state.write().unwrap() = ServerState::Starting;
if let Err(e) = self.start_server(!is_blue_active, loras).await {
*self.state.write().unwrap() = ServerState::Active;
tracing::error!(error = %e, "Failed to start standby server");
return Err(e);
}
tracing::info!(
new_active = standby_name,
"Standby ready, switching active endpoint"
);
self.active_is_blue.store(!is_blue_active, Ordering::SeqCst);
*self.state.write().unwrap() = ServerState::Active;
*self.state.write().unwrap() = ServerState::Releasing;
tracing::info!(old = old_name, "Stopping old server");
if let Err(e) = self.stop_server(is_blue_active).await {
tracing::warn!(error = %e, "Failed to stop old server (continuing anyway)");
}
*self.loaded_loras.write().unwrap() = loras.to_vec();
*self.state.write().unwrap() = ServerState::Active;
tracing::info!(
active_endpoint = %self.active_endpoint(),
"Switch completed"
);
Ok(())
}
pub async fn switch_with_model(&self, model: &TrainedModel) -> Result<(), ApplicatorError> {
self.switch_with_new_lora(&model.adapter_path).await
}
pub async fn stop_all(&self) -> Result<(), ApplicatorError> {
tracing::info!("Stopping all servers");
let _ = self.stop_server(true).await;
let _ = self.stop_server(false).await;
*self.state.write().unwrap() = ServerState::Down;
Ok(())
}
async fn start_server(&self, is_blue: bool, loras: &[PathBuf]) -> Result<(), ApplicatorError> {
let (port, pid_file, log_file, name) = if is_blue {
(
self.config.blue_port,
self.config.blue_pid_file(),
self.config.blue_log_file(),
"Blue",
)
} else {
(
self.config.green_port,
self.config.green_pid_file(),
self.config.green_log_file(),
"Green",
)
};
if !self.config.base_model_path.exists() {
return Err(ApplicatorError::Other(format!(
"Base model not found: {}",
self.config.base_model_path.display()
)));
}
for lora in loras {
if !lora.exists() {
return Err(ApplicatorError::AdapterNotFound(lora.clone()));
}
}
let mut cmd = Command::new(&self.config.server_path);
cmd.args([
"-m",
self.config.base_model_path.to_str().unwrap(),
"--host",
&self.config.host,
"--port",
&port.to_string(),
"-ngl",
&self.config.n_gpu_layers.to_string(),
"-c",
&self.config.ctx_size.to_string(),
"-np",
&self.config.parallel.to_string(),
"--cont-batching",
]);
if !loras.is_empty() {
cmd.arg("--lora-init-without-apply");
for lora in loras {
cmd.args(["--lora", lora.to_str().unwrap()]);
}
}
let log = std::fs::File::create(&log_file)?;
let log_err = log.try_clone()?;
cmd.stdout(Stdio::from(log));
cmd.stderr(Stdio::from(log_err));
match cmd.spawn() {
Ok(child) => {
let pid = child.id().unwrap_or(0);
tokio::fs::write(&pid_file, pid.to_string()).await?;
tracing::info!(
name,
pid,
port,
loras = loras.len(),
"Server process started, waiting for ready"
);
self.wait_for_ready(port).await?;
tracing::info!(name, port, "Server is ready");
Ok(())
}
Err(e) => Err(ApplicatorError::ServerStartFailed(format!(
"{} server start failed: {}",
name, e
))),
}
}
async fn stop_server(&self, is_blue: bool) -> Result<(), ApplicatorError> {
let (pid_file, name) = if is_blue {
(self.config.blue_pid_file(), "Blue")
} else {
(self.config.green_pid_file(), "Green")
};
if !pid_file.exists() {
return Ok(());
}
let pid_str = tokio::fs::read_to_string(&pid_file).await?;
let pid: u32 = pid_str
.trim()
.parse()
.map_err(|_| ApplicatorError::ServerStopFailed("Invalid PID".to_string()))?;
let status = Command::new("kill").arg(pid.to_string()).status().await?;
if !status.success() {
tracing::debug!(name, pid, "Process already stopped or kill failed");
}
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = tokio::fs::remove_file(&pid_file).await;
tracing::debug!(name, pid, "Server stopped");
Ok(())
}
async fn wait_for_ready(&self, port: u16) -> Result<(), ApplicatorError> {
let addr = format!("{}:{}", self.config.host, port);
let max_attempts = (self.config.startup_timeout_secs * 2) as usize; let delay = Duration::from_millis(500);
let mut tcp_connected = false;
for attempt in 1..=max_attempts {
tokio::time::sleep(delay).await;
match tokio::net::TcpStream::connect(&addr).await {
Ok(_) => {
tracing::debug!(attempt, port, "Server TCP connection established");
tcp_connected = true;
break;
}
Err(_) => {
tracing::trace!(attempt, port, "Waiting for server TCP...");
}
}
}
if !tcp_connected {
return Err(ApplicatorError::ServerStartFailed(format!(
"Timeout waiting for TCP connection on port {} ({}s)",
port, self.config.startup_timeout_secs
)));
}
let health_url = format!("http://{}:{}/health", self.config.host, port);
for attempt in 1..=max_attempts {
tokio::time::sleep(delay).await;
match Self::simple_http_get(&health_url).await {
Ok(200) => {
tracing::debug!(attempt, port, "Server health check passed");
return Ok(());
}
Ok(status) => {
tracing::trace!(attempt, port, status, "Health check returned non-200");
}
Err(_) => {
tracing::trace!(attempt, port, "Health check failed");
}
}
}
Err(ApplicatorError::ServerStartFailed(format!(
"Timeout waiting for health check on port {} ({}s)",
port, self.config.startup_timeout_secs
)))
}
async fn simple_http_get(url: &str) -> Result<u16, std::io::Error> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let url = url.strip_prefix("http://").unwrap_or(url);
let (host_port, path) = url.split_once('/').unwrap_or((url, "health"));
let path = format!("/{}", path);
let mut stream = tokio::net::TcpStream::connect(host_port).await?;
let request = format!(
"GET {} HTTP/1.1\r\nHost: {}\r\nConnection: close\r\n\r\n",
path, host_port
);
stream.write_all(request.as_bytes()).await?;
let mut buf = [0u8; 256];
let n = stream.read(&mut buf).await?;
let response = String::from_utf8_lossy(&buf[..n]);
if let Some(line) = response.lines().next() {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 2 {
if let Ok(status) = parts[1].parse::<u16>() {
return Ok(status);
}
}
}
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Invalid HTTP response",
))
}
pub fn loaded_loras(&self) -> Vec<PathBuf> {
self.loaded_loras.read().unwrap().clone()
}
pub fn is_blue_active(&self) -> bool {
self.active_is_blue.load(Ordering::SeqCst)
}
pub fn switching_behavior(&self) -> &SwitchingBehavior {
&self.config.switching_behavior
}
pub async fn wait_until_available(&self) -> Result<(), ApplicatorError> {
match &self.config.switching_behavior {
SwitchingBehavior::FailImmediately => {
if self.state().is_available() {
Ok(())
} else {
Err(ApplicatorError::Other(
"Server not available (down)".to_string(),
))
}
}
SwitchingBehavior::WaitForReady { timeout_secs } => {
let max_attempts = (*timeout_secs * 2) as usize; let delay = Duration::from_millis(500);
for attempt in 1..=max_attempts {
if self.state().is_available() {
tracing::debug!(attempt, "Server is available");
return Ok(());
}
tracing::trace!(attempt, "Waiting for server to become available...");
tokio::time::sleep(delay).await;
}
Err(ApplicatorError::Other(format!(
"Timeout waiting for server availability ({}s)",
timeout_secs
)))
}
}
}
}
pub trait EndpointResolver: Send + Sync {
fn current_endpoint(&self) -> String;
fn is_available(&self) -> bool;
fn switching_behavior(&self) -> SwitchingBehavior;
}
impl EndpointResolver for BlueGreenManager {
fn current_endpoint(&self) -> String {
self.active_endpoint()
}
fn is_available(&self) -> bool {
self.state().is_available()
}
fn switching_behavior(&self) -> SwitchingBehavior {
self.config.switching_behavior.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_builder() {
let config = BlueGreenConfig::new("/path/to/model.gguf")
.host("0.0.0.0")
.blue_port(9000)
.green_port(9001)
.n_gpu_layers(50);
assert_eq!(config.base_model_path, PathBuf::from("/path/to/model.gguf"));
assert_eq!(config.host, "0.0.0.0");
assert_eq!(config.blue_port, 9000);
assert_eq!(config.green_port, 9001);
assert_eq!(config.n_gpu_layers, 50);
}
#[test]
fn test_pid_log_paths() {
let config = BlueGreenConfig::new("/model.gguf").data_dir("/tmp/test");
assert_eq!(config.blue_pid_file(), PathBuf::from("/tmp/test/blue.pid"));
assert_eq!(
config.green_pid_file(),
PathBuf::from("/tmp/test/green.pid")
);
assert_eq!(config.blue_log_file(), PathBuf::from("/tmp/test/blue.log"));
assert_eq!(
config.green_log_file(),
PathBuf::from("/tmp/test/green.log")
);
}
#[test]
fn test_endpoint_switching() {
let config = BlueGreenConfig::new("/model.gguf")
.host("127.0.0.1")
.blue_port(8080)
.green_port(8081);
let manager = BlueGreenManager::new(config).unwrap();
assert!(manager.is_blue_active());
assert_eq!(manager.active_endpoint(), "http://127.0.0.1:8080");
assert_eq!(manager.standby_endpoint(), "http://127.0.0.1:8081");
manager.active_is_blue.store(false, Ordering::SeqCst);
assert!(!manager.is_blue_active());
assert_eq!(manager.active_endpoint(), "http://127.0.0.1:8081");
assert_eq!(manager.standby_endpoint(), "http://127.0.0.1:8080");
}
#[test]
fn test_server_state() {
let config = BlueGreenConfig::new("/model.gguf");
let manager = BlueGreenManager::new(config).unwrap();
assert_eq!(manager.state(), ServerState::Down);
assert!(!manager.state().is_available());
*manager.state.write().unwrap() = ServerState::Active;
assert_eq!(manager.state(), ServerState::Active);
assert!(manager.state().is_available());
*manager.state.write().unwrap() = ServerState::Starting;
assert_eq!(manager.state(), ServerState::Starting);
assert!(manager.state().is_available());
*manager.state.write().unwrap() = ServerState::Releasing;
assert_eq!(manager.state(), ServerState::Releasing);
assert!(manager.state().is_available()); }
}