use crate::proxy::monitor::ProxyMonitor;
use crate::proxy::{ProxyConfig, TokenManager};
use serde::{Deserialize, Serialize};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProxyStatus {
pub running: bool,
pub port: u16,
pub base_url: String,
pub active_accounts: usize,
}
#[derive(Clone)]
pub struct ProxyServiceState {
pub instance: Arc<RwLock<Option<ProxyServiceInstance>>>,
pub monitor: Arc<RwLock<Option<Arc<ProxyMonitor>>>>,
pub admin_server: Arc<RwLock<Option<AdminServerInstance>>>,
pub starting: Arc<AtomicBool>,
}
pub struct AdminServerInstance {
pub axum_server: crate::proxy::AxumServer,
pub server_handle: tokio::task::JoinHandle<()>,
}
pub struct ProxyServiceInstance {
pub token_manager: Arc<TokenManager>,
}
impl ProxyServiceState {
pub fn new() -> Self {
Self {
instance: Arc::new(RwLock::new(None)),
monitor: Arc::new(RwLock::new(None)),
admin_server: Arc::new(RwLock::new(None)),
starting: Arc::new(AtomicBool::new(false)),
}
}
}
struct StartingGuard(Arc<AtomicBool>);
impl Drop for StartingGuard {
fn drop(&mut self) {
self.0.store(false, Ordering::SeqCst);
}
}
async fn ensure_monitor(state: &ProxyServiceState) -> Arc<ProxyMonitor> {
let (monitor, needs_startup_maintenance) = {
let mut monitor_lock = state.monitor.write().await;
if let Some(existing) = monitor_lock.as_ref() {
(existing.clone(), false)
} else {
let created = Arc::new(ProxyMonitor::new(1000));
*monitor_lock = Some(created.clone());
(created, true)
}
};
if needs_startup_maintenance {
monitor.run_startup_maintenance().await;
}
monitor
}
pub async fn internal_start_proxy_service(
config: ProxyConfig,
state: &ProxyServiceState,
integration: crate::modules::system::integration::SystemManager,
) -> Result<ProxyStatus, String> {
{
let instance_lock = state.instance.read().await;
if instance_lock.is_some() {
return Err("Service is already running".to_string());
}
}
if state
.starting
.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
.is_err()
{
return Err("Service is starting, please wait...".to_string());
}
let _starting_guard = StartingGuard(state.starting.clone());
let monitor = ensure_monitor(state).await;
monitor.set_enabled(config.enable_logging);
crate::proxy::middleware::client_ip::set_trusted_proxies(config.trusted_proxies.clone());
ensure_admin_server(config.clone(), state, integration.clone()).await?;
let token_manager = {
let admin_lock = state.admin_server.read().await;
admin_lock
.as_ref()
.unwrap()
.axum_server
.token_manager
.clone()
};
token_manager.start_auto_cleanup().await;
token_manager
.update_sticky_config(config.scheduling.clone())
.await;
token_manager.update_session_binding_persistence(config.persist_session_bindings);
token_manager
.update_compliance_config(config.compliance.clone())
.await;
let app_config = crate::modules::system::config::load_app_config()
.unwrap_or_else(|_| crate::models::AppConfig::new());
token_manager
.update_circuit_breaker_config(app_config.circuit_breaker)
.await;
if let Some(ref account_id) = config.preferred_account_id {
token_manager
.set_preferred_account(Some(account_id.clone()))
.await;
tracing::info!("🔒 Fixed account mode restored: {}", account_id);
}
let active_accounts = token_manager.load_accounts().await.unwrap_or(0);
tracing::info!(
"[Startup Health] Running startup token refresh checks in paced sequential mode to avoid simultaneous Google connections"
);
let _ = token_manager.run_startup_health_check().await;
token_manager.restore_persisted_session_bindings();
if active_accounts == 0 {
let zai_enabled = config.zai.enabled
&& !matches!(config.zai.dispatch_mode, crate::proxy::ZaiDispatchMode::Off);
if !zai_enabled {
tracing::warn!(
"[W-PROXY-NO-ACCOUNTS] no_available_accounts_proxy_logic_paused_add_accounts_via_management_interface"
);
return Ok(ProxyStatus {
running: false,
port: config.port,
base_url: format!("http://127.0.0.1:{}", config.port),
active_accounts: 0,
});
}
}
let mut instance_lock = state.instance.write().await;
let admin_lock = state.admin_server.read().await;
let axum_server = admin_lock.as_ref().unwrap().axum_server.clone();
let instance = ProxyServiceInstance {
token_manager: token_manager.clone(),
};
axum_server.set_running(true).await;
*instance_lock = Some(instance);
Ok(ProxyStatus {
running: true,
port: config.port,
base_url: format!("http://127.0.0.1:{}", config.port),
active_accounts,
})
}
pub async fn ensure_admin_server(
config: ProxyConfig,
state: &ProxyServiceState,
integration: crate::modules::system::integration::SystemManager,
) -> Result<(), String> {
let mut admin_lock = state.admin_server.write().await;
if admin_lock.is_some() {
return Ok(());
}
crate::proxy::middleware::client_ip::set_trusted_proxies(config.trusted_proxies.clone());
let monitor = ensure_monitor(state).await;
let app_data_dir = crate::modules::auth::account::get_data_dir()?;
let token_manager = Arc::new(TokenManager::new(app_data_dir));
let _ = token_manager.load_accounts().await;
let start_config = crate::proxy::AxumStartConfig {
host: config.get_bind_address().to_string(),
port: config.port,
token_manager,
custom_mapping: config.custom_mapping.clone(),
request_timeout: config.request_timeout,
upstream_proxy: config.upstream_proxy.clone(),
user_agent_override: config.user_agent_override.clone(),
cors_config: config.cors.clone(),
security_config: crate::proxy::ProxySecurityConfig::from_proxy_config(&config),
zai_config: config.zai.clone(),
monitor,
experimental_config: config.experimental.clone(),
debug_logging: config.debug_logging.clone(),
google_config: config.google.clone(),
integration: integration.clone(),
proxy_pool_config: config.proxy_pool.clone(),
};
let (axum_server, server_handle) = match crate::proxy::AxumServer::start(start_config).await {
Ok((server, handle)) => (server, handle),
Err(e) => return Err(format!("Failed to start management server: {}", e)),
};
*admin_lock = Some(AdminServerInstance {
axum_server,
server_handle,
});
crate::proxy::update_thinking_budget_config(config.thinking_budget.clone());
Ok(())
}
pub async fn internal_stop_proxy_service(state: &ProxyServiceState) -> Result<(), String> {
{
let mut instance_lock = state.instance.write().await;
*instance_lock = None;
}
let admin_instance = {
let mut admin_lock = state.admin_server.write().await;
admin_lock.take()
};
if let Some(admin_instance) = admin_instance {
admin_instance.axum_server.set_running(false).await;
admin_instance.axum_server.request_shutdown();
let mut server_handle = admin_instance.server_handle;
match tokio::time::timeout(std::time::Duration::from_secs(15), &mut server_handle).await {
Ok(Ok(())) => {
tracing::info!("Proxy server task exited cleanly");
}
Ok(Err(e)) => {
tracing::warn!(
"[W-SHUTDOWN-JOIN] proxy_server_task_join_error_during_shutdown: {}",
e
);
}
Err(_) => {
tracing::warn!(
"[W-SHUTDOWN-TIMEOUT] proxy_server_shutdown_timed_out_aborting_server_task"
);
server_handle.abort();
let _ = server_handle.await;
}
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::{internal_start_proxy_service, internal_stop_proxy_service, ProxyServiceState};
use crate::modules::persistence::proxy_db;
use crate::modules::system::integration::SystemManager;
use crate::proxy::monitor::ProxyRequestLog;
use crate::proxy::ProxyConfig;
use crate::test_utils::{lock_env, ScopedEnvVar};
use std::sync::OnceLock;
use tokio::sync::Mutex;
static PROXY_STARTUP_TEST_LOCK: OnceLock<Mutex<()>> = OnceLock::new();
fn reserve_local_port() -> u16 {
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("bind ephemeral test listener");
listener
.local_addr()
.expect("ephemeral listener local_addr")
.port()
}
#[tokio::test(flavor = "current_thread")]
async fn startup_runs_monitor_maintenance_and_initializes_proxy_db() {
let _security_guard = crate::proxy::tests::acquire_security_test_lock();
let _env_guard = lock_env();
let _guard = PROXY_STARTUP_TEST_LOCK
.get_or_init(|| Mutex::new(()))
.lock()
.await;
let data_dir = std::env::temp_dir().join(format!(
".gephyr-proxy-startup-test-{}",
uuid::Uuid::new_v4()
));
std::fs::create_dir_all(&data_dir).expect("create temp data dir");
let _data_dir_env = ScopedEnvVar::set("DATA_DIR", data_dir.to_string_lossy().as_ref());
proxy_db::init_db().expect("proxy db init");
let old_log_id = format!("startup-maintenance-{}", uuid::Uuid::new_v4());
let old_timestamp = chrono::Utc::now().timestamp() - (40 * 24 * 3600);
let seeded_old_log = ProxyRequestLog {
id: old_log_id.clone(),
timestamp: old_timestamp,
method: "GET".to_string(),
url: "/maintenance-test".to_string(),
status: 200,
duration: 1,
model: None,
mapped_model: None,
account_email: None,
client_ip: Some("127.0.0.1".to_string()),
correlation_id: None,
request_id: None,
error: None,
request_body: None,
response_body: None,
input_tokens: None,
output_tokens: None,
protocol: None,
username: None,
};
proxy_db::save_log(&seeded_old_log).expect("seed old proxy log");
let config = ProxyConfig {
port: reserve_local_port(),
enable_logging: false,
..ProxyConfig::default()
};
let state = ProxyServiceState::new();
let start_result =
internal_start_proxy_service(config, &state, SystemManager::Headless).await;
assert!(
start_result.is_ok(),
"service start should succeed: {:?}",
start_result.err()
);
assert!(
proxy_db::get_log_detail(&old_log_id).is_err(),
"startup maintenance should clean old proxy logs"
);
let _ = internal_stop_proxy_service(&state).await;
let _ = std::fs::remove_dir_all(&data_dir);
}
}