use std::path::PathBuf;
use std::sync::Arc;
use clap::Parser;
use tokio::sync::{mpsc, oneshot};
use tracing_subscriber::EnvFilter;
use zinit::sdk::{ServiceState, socket};
use zinit::server::ipc::{self, IpcCommand};
use zinit::server::supervisor::Supervisor;
use zinit::server::syslog::{self, SyslogReceiver};
use zinit::server::xinet::{IsRunningFn, StartServiceFn, StopServiceFn, XinetManager};
#[derive(Debug, Clone, Copy)]
enum ShutdownType {
Normal,
PrepareRestart,
}
#[derive(Parser, Debug)]
#[command(name = "zinit-server")]
#[command(version, about, long_about = None)]
struct Args {
#[arg(short, long)]
config_dir: Option<PathBuf>,
#[arg(short, long)]
socket: Option<PathBuf>,
#[arg(long, default_value = "info")]
log_level: String,
#[arg(long)]
pid1_mode: bool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let args = Args::parse();
let config_dir = args.config_dir.unwrap_or_else(|| {
if args.pid1_mode {
socket::system_config_dir()
} else {
socket::user_config_dir()
}
});
let socket_path = args.socket.unwrap_or_else(|| {
if args.pid1_mode {
socket::system_path()
} else {
socket::user_path()
}
});
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(false)
.init();
if let Some(parent) = socket_path.parent()
&& !parent.exists()
{
std::fs::create_dir_all(parent)?;
}
if !config_dir.exists() {
std::fs::create_dir_all(&config_dir)?;
}
tracing::info!(
config_dir = %config_dir.display(),
socket = %socket_path.display(),
pid1_mode = args.pid1_mode,
"starting zinit-server"
);
let mut supervisor =
Supervisor::new(config_dir.clone(), socket_path.clone(), args.pid1_mode).await?;
let graph = supervisor.graph();
let log_buffers = supervisor.log_buffers();
let (command_tx, mut command_rx) = mpsc::channel::<IpcCommand>(64);
let ipc_socket_path = socket_path.clone();
let ipc_config_dir = config_dir.clone();
let ipc_command_tx = command_tx.clone();
let xinet_graph = Arc::clone(&graph);
let xinet_start_tx = command_tx.clone();
let xinet_stop_tx = command_tx.clone();
let start_service: StartServiceFn = Arc::new(move |name: &str| {
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
xinet_start_tx
.blocking_send(IpcCommand::StartService {
name: name.to_string(),
response_tx,
})
.is_ok()
});
let stop_service: StopServiceFn = Arc::new(move |name: &str| {
let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
xinet_stop_tx
.blocking_send(IpcCommand::StopService {
name: name.to_string(),
response_tx,
})
.is_ok()
});
let is_running: IsRunningFn = Arc::new(move |name: &str| {
match xinet_graph.try_read() {
Ok(g) => {
if let Some(id) = g.get_by_name(name)
&& let Some(service) = g.get(id)
{
return matches!(service.state, ServiceState::Running { .. });
}
false
}
Err(_) => false, }
});
let xinet_manager = Some(Arc::new(XinetManager::new(
start_service,
stop_service,
is_running,
)));
let ipc_handle = tokio::spawn(async move {
if let Err(e) = ipc::run_ipc_server(
&ipc_socket_path,
ipc_config_dir,
graph,
log_buffers,
ipc_command_tx,
xinet_manager,
)
.await
{
tracing::error!(error = %e, "IPC server error");
}
});
let syslog_handle = if syslog::should_enable_syslog(args.pid1_mode) {
let syslog_graph = supervisor.graph();
let syslog_log_buffers = supervisor.log_buffers();
let syslog_path = syslog::default_socket_path();
let receiver = SyslogReceiver::new(syslog_path, syslog_graph, syslog_log_buffers);
Some(tokio::spawn(async move {
if let Err(e) = receiver.run().await {
tracing::error!(error = %e, "syslog receiver error");
}
}))
} else {
tracing::debug!("syslog receiver disabled (not in PID 1 mode)");
None
};
let (shutdown_tx, shutdown_rx) = oneshot::channel::<ShutdownType>();
let event_tx = supervisor.event_tx();
let command_handle = tokio::spawn(async move {
use zinit::server::supervisor::SupervisorEvent;
while let Some(command) = command_rx.recv().await {
match command {
IpcCommand::Shutdown => {
tracing::info!("shutdown requested via IPC");
let _ = shutdown_tx.send(ShutdownType::Normal);
break;
}
IpcCommand::StartService { name, response_tx } => {
let _ = event_tx
.send(SupervisorEvent::StartService { name, response_tx })
.await;
}
IpcCommand::StopService { name, response_tx } => {
let _ = event_tx
.send(SupervisorEvent::StopService { name, response_tx })
.await;
}
IpcCommand::RestartService { name, response_tx } => {
let _ = event_tx
.send(SupervisorEvent::RestartService { name, response_tx })
.await;
}
IpcCommand::KillService {
name,
signal,
response_tx,
} => {
let _ = event_tx
.send(SupervisorEvent::KillService {
name,
signal,
response_tx,
})
.await;
}
IpcCommand::AddService {
config,
persist: _,
response_tx,
} => {
let _ = event_tx
.send(SupervisorEvent::AddService {
config,
response_tx,
})
.await;
}
IpcCommand::RemoveService { name, response_tx } => {
let _ = event_tx
.send(SupervisorEvent::RemoveService { name, response_tx })
.await;
}
IpcCommand::Reload { response_tx } => {
let _ = event_tx.send(SupervisorEvent::Reload { response_tx }).await;
}
IpcCommand::PrepareRestart { response_tx } => {
tracing::info!("prepare_restart requested via IPC");
let _ = event_tx
.send(SupervisorEvent::PrepareRestart { response_tx })
.await;
let _ = shutdown_tx.send(ShutdownType::PrepareRestart);
break;
}
}
}
});
tokio::select! {
result = supervisor.run() => {
if let Err(e) = result {
tracing::error!(error = %e, "supervisor error");
}
}
shutdown_type = shutdown_rx => {
match shutdown_type {
Ok(ShutdownType::Normal) => {
tracing::info!("shutdown signal received, stopping all services");
supervisor.stop_all_services().await;
}
Ok(ShutdownType::PrepareRestart) => {
tracing::info!("prepare_restart complete, exiting without stopping services");
}
Err(_) => {
tracing::warn!("shutdown channel closed unexpectedly");
}
}
}
}
ipc_handle.abort();
command_handle.abort();
if let Some(handle) = syslog_handle {
handle.abort();
}
let _ = std::fs::remove_file(&socket_path);
tracing::info!("zinit-server stopped");
Ok(())
}