#[allow(dead_code)]
pub mod acp_client;
pub mod active_commands;
pub mod api;
pub mod db;
pub mod proposal_session;
pub mod registry;
pub mod runner;
pub mod terminal;
use std::io::Write;
use std::net::SocketAddr;
use tracing::{info, warn};
use crate::config::{ProposalSessionConfig, ServerConfig};
use crate::error::Result;
use crate::server::db::ServerDb;
fn server_base_url(bind: &str, actual_port: u16) -> String {
crate::web::build_access_url(bind, actual_port)
}
pub async fn run_server(
config: ServerConfig,
resolve_command: Option<String>,
proposal_session_config: ProposalSessionConfig,
) -> Result<()> {
config.validate()?;
let registry = registry::create_shared_registry(&config.data_dir, config.max_concurrent_total)?;
let db = ServerDb::new(&config.data_dir)?;
if let Err(e) = db.cleanup_old_logs(30) {
warn!(error = %e, "Failed to run startup log cleanup");
}
{
let mut reg = registry.write().await;
for row in db.load_change_states()? {
reg.set_change_state(
&row.project_id,
&row.change_id,
row.selected,
row.error_message,
);
}
}
let runners = runner::create_shared_runners();
let auth_token = match &config.auth.mode {
crate::config::ServerAuthMode::BearerToken => config.auth.resolve_token(),
crate::config::ServerAuthMode::None => None,
};
let (log_tx, _) = tokio::sync::broadcast::channel(crate::server::api::SERVER_LOG_BUFFER_SIZE);
let state_update_tx = crate::server::api::create_state_update_channel();
let proposal_session_manager = proposal_session::create_proposal_session_manager(
proposal_session_config,
Some(db.clone()),
);
{
let active_sessions = db.load_active_proposal_sessions()?;
let mut manager = proposal_session_manager.write().await;
for row in active_sessions {
match manager.restore_session(&row).await {
Ok(Some(_)) => {
info!(session_id = %row.id, "Restored proposal session from database");
}
Ok(None) => {
info!(session_id = %row.id, "Removed stale persisted proposal session");
}
Err(e) => {
warn!(
session_id = %row.id,
error = %e,
"Failed to restore persisted proposal session"
);
}
}
}
}
let app_state = api::AppState {
registry,
runners,
db: Some(db.clone()),
auth_token,
max_concurrent_total: config.max_concurrent_total,
resolve_command,
log_tx,
state_update_tx,
orchestration_status: std::sync::Arc::new(tokio::sync::RwLock::new(
registry::OrchestrationStatus::default(),
)),
shared_orchestrator_state: std::sync::Arc::new(tokio::sync::RwLock::new(
crate::orchestration::state::OrchestratorState::new(Vec::new(), 1),
)),
terminal_manager: terminal::create_terminal_manager(),
active_commands: active_commands::create_shared_active_commands(),
proposal_session_manager,
};
{
let registry_for_monitor = app_state.registry.clone();
tokio::spawn(async move {
api::run_remote_sync_state_monitor(registry_for_monitor).await;
});
}
let router = api::build_router(app_state.clone());
let addr: SocketAddr = format!("{}:{}", config.bind, config.port)
.parse()
.map_err(|e| {
crate::error::OrchestratorError::ConfigLoad(format!(
"Invalid server address '{}:{}': {}",
config.bind, config.port, e
))
})?;
info!("Starting server daemon on {}", addr);
let listener = tokio::net::TcpListener::bind(addr).await.map_err(|e| {
crate::error::OrchestratorError::Io(std::io::Error::other(format!(
"Failed to bind to {}: {}",
addr, e
)))
})?;
let actual_addr = listener.local_addr().map_err(|e| {
crate::error::OrchestratorError::Io(std::io::Error::other(format!(
"Failed to resolve bound address: {}",
e
)))
})?;
let url = server_base_url(&config.bind, actual_addr.port());
println!("{}", url);
let _ = std::io::stdout().flush();
info!("Server daemon listening on {}", url);
let psm_for_scanner = app_state.proposal_session_manager.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(60));
loop {
interval.tick().await;
let mut manager = psm_for_scanner.write().await;
manager.scan_timeouts().await;
}
});
let db_for_cleanup = db.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(24 * 60 * 60));
loop {
interval.tick().await;
match db_for_cleanup.cleanup_old_logs(30) {
Ok(deleted) => info!(deleted, "Completed periodic log cleanup"),
Err(e) => warn!(error = %e, "Failed periodic log cleanup"),
}
}
});
let psm_for_shutdown = app_state.proposal_session_manager.clone();
let server = axum::serve(listener, router).with_graceful_shutdown(async move {
tokio::signal::ctrl_c()
.await
.expect("Failed to install Ctrl+C handler");
info!("Shutdown signal received, cleaning up proposal sessions...");
let mut manager = psm_for_shutdown.write().await;
manager.cleanup_all(None).await;
info!("Proposal session cleanup complete");
});
server.await.map_err(|e| {
crate::error::OrchestratorError::Io(std::io::Error::other(format!("Server error: {}", e)))
})?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::server_base_url;
#[test]
fn test_server_base_url_localhost() {
let url = server_base_url("127.0.0.1", 39876);
assert_eq!(url, "http://localhost:39876");
let url = server_base_url("localhost", 39876);
assert_eq!(url, "http://localhost:39876");
}
#[test]
fn test_server_base_url_specific_address() {
let url = server_base_url("192.168.1.50", 9000);
assert_eq!(url, "http://192.168.1.50:9000");
}
#[test]
fn test_server_base_url_zero_address_does_not_expose_zeros() {
let url = server_base_url("0.0.0.0", 8080);
assert!(url.starts_with("http://"));
assert!(url.ends_with(":8080"));
assert!(!url.contains("0.0.0.0"));
}
}