mod api;
mod output;
mod supervisor;
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::UnixListener;
use ubermind_core::config;
use ubermind_core::protocol::{self, Request, Response};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().init();
let args: Vec<String> = std::env::args().skip(1).collect();
let _foreground = args.iter().any(|a| a == "--foreground" || a == "-f");
let enable_http = args.iter().any(|a| a == "--http");
let global_config = config::load_global_config();
let port = global_config.daemon.port;
let http_port = if enable_http { Some(port) } else { None };
let supervisor = supervisor::Supervisor::new(global_config.clone(), http_port);
let state_dir = protocol::state_dir();
let _ = std::fs::create_dir_all(&state_dir);
let pid_path = protocol::pid_path();
let _ = std::fs::write(&pid_path, std::process::id().to_string());
let socket_path = protocol::socket_path();
if socket_path.exists() {
let _ = std::fs::remove_file(&socket_path);
}
output::expire_logs(global_config.logs.max_age_days, global_config.logs.max_files);
{
let config = global_config.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
output::expire_logs(config.logs.max_age_days, config.logs.max_files);
}
});
}
let sup_socket = Arc::clone(&supervisor);
let socket_handle = tokio::spawn(async move {
run_socket_server(sup_socket, &socket_path).await;
});
let http_handle = if enable_http {
let sup_http = Arc::clone(&supervisor);
Some(tokio::spawn(async move {
run_http_server(sup_http, port).await;
}))
} else {
None
};
tracing::info!("daemon started (pid {})", std::process::id());
if enable_http {
tracing::info!("HTTP server on port {}", port);
}
tokio::select! {
_ = socket_handle => {},
_ = async {
if let Some(h) = http_handle { h.await.ok(); }
else { std::future::pending::<()>().await; }
} => {},
_ = tokio::signal::ctrl_c() => {
tracing::info!("shutting down");
}
}
let _ = std::fs::remove_file(protocol::socket_path());
let _ = std::fs::remove_file(protocol::pid_path());
}
async fn run_socket_server(supervisor: Arc<supervisor::Supervisor>, socket_path: &std::path::Path) {
let listener = match UnixListener::bind(socket_path) {
Ok(l) => l,
Err(e) => {
tracing::error!("failed to bind socket: {}", e);
return;
}
};
tracing::info!("listening on {}", socket_path.display());
loop {
let (stream, _) = match listener.accept().await {
Ok(s) => s,
Err(e) => {
tracing::error!("accept error: {}", e);
continue;
}
};
let sup = Arc::clone(&supervisor);
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
let request: Request = match serde_json::from_str(&line) {
Ok(r) => r,
Err(e) => {
let resp = Response::Error {
message: format!("invalid request: {}", e),
};
let _ = write_response(&mut writer, &resp).await;
continue;
}
};
let response = handle_request(&sup, request).await;
if write_response(&mut writer, &response).await.is_err() {
break;
}
}
});
}
}
async fn handle_request(supervisor: &Arc<supervisor::Supervisor>, request: Request) -> Response {
match request {
Request::Ping => Response::Pong,
Request::Status => {
let services = supervisor.status().await;
Response::Status { services, http_port: supervisor.http_port }
}
Request::Start { names, all, processes } => {
let mut messages = Vec::new();
for name in &names {
match supervisor.start_service_filtered(name, all, &processes).await {
Ok(msg) => messages.push(msg),
Err(e) => return Response::Error { message: e },
}
}
Response::Ok {
message: Some(messages.join("\n")),
}
}
Request::Stop { names } => {
let mut messages = Vec::new();
for name in &names {
match supervisor.stop_service(name).await {
Ok(msg) => messages.push(msg),
Err(e) => return Response::Error { message: e },
}
}
Response::Ok {
message: Some(messages.join("\n")),
}
}
Request::Reload { names, all, processes } => {
let mut messages = Vec::new();
for name in &names {
match supervisor.reload_service_filtered(name, all, &processes).await {
Ok(msg) => messages.push(msg),
Err(e) => return Response::Error { message: e },
}
}
Response::Ok {
message: Some(messages.join("\n")),
}
}
Request::Restart { service, process } => {
match supervisor.restart_process(&service, &process).await {
Ok(msg) => Response::Ok { message: Some(msg) },
Err(e) => Response::Error { message: e },
}
}
Request::Kill { service, process } => {
match supervisor.kill_process(&service, &process).await {
Ok(msg) => Response::Ok { message: Some(msg) },
Err(e) => Response::Error { message: e },
}
}
Request::Logs { service, process, follow: _ } => {
match supervisor.get_output(&service, process.as_deref()).await {
Ok(capture) => {
let snapshot = capture.snapshot().await;
Response::Log {
line: String::from_utf8_lossy(&snapshot).to_string(),
}
}
Err(e) => Response::Error { message: e },
}
}
Request::Shutdown => {
tokio::spawn(async {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
std::process::exit(0);
});
Response::Ok {
message: Some("shutting down".to_string()),
}
}
}
}
async fn write_response(
writer: &mut tokio::net::unix::OwnedWriteHalf,
response: &Response,
) -> Result<(), std::io::Error> {
let mut data = serde_json::to_vec(response).unwrap();
data.push(b'\n');
writer.write_all(&data).await
}
async fn run_http_server(supervisor: Arc<supervisor::Supervisor>, port: u16) {
let app = api::router(supervisor);
let addr = std::net::SocketAddr::from(([127, 0, 0, 1], port));
let listener = match tokio::net::TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
tracing::error!("failed to bind HTTP on {}: {}", addr, e);
return;
}
};
tracing::info!("HTTP listening on {}", addr);
if let Err(e) = axum::serve(listener, app).await {
tracing::error!("HTTP server error: {}", e);
}
}