zinit 0.3.9

Process supervisor with dependency management
Documentation
//! zinit-server - Process supervisor daemon.

use std::path::PathBuf;
use std::sync::Arc;

use clap::Parser;
use tokio::sync::{mpsc, oneshot};
use tracing_subscriber::EnvFilter;

use zinit::sdk::{ServiceState, socket};
use zinit::server::ipc::{self, IpcCommand};
use zinit::server::supervisor::Supervisor;
use zinit::server::syslog::{self, SyslogReceiver};
use zinit::server::xinet::{IsRunningFn, StartServiceFn, StopServiceFn, XinetManager};

/// Type of shutdown.
#[derive(Debug, Clone, Copy)]
enum ShutdownType {
    /// Normal shutdown - stop all services.
    Normal,
    /// Prepare for restart - keep services running, just exit.
    PrepareRestart,
}

/// zinit-server - Process supervisor with dependency management.
#[derive(Parser, Debug)]
#[command(name = "zinit-server")]
#[command(version, about, long_about = None)]
struct Args {
    /// Configuration directory containing service definitions.
    /// Defaults to ~/hero/zinit/services (standalone) or /etc/zinit/services (pid1 mode).
    #[arg(short, long)]
    config_dir: Option<PathBuf>,

    /// Path to the Unix socket for IPC.
    /// Defaults to ~/hero/zinit/zinit.sock (standalone) or /var/run/zinit.sock (pid1 mode).
    #[arg(short, long)]
    socket: Option<PathBuf>,

    /// Log level (trace, debug, info, warn, error).
    #[arg(long, default_value = "info")]
    log_level: String,

    /// PID1 mode: load system services from /etc/zinit/system first.
    /// System services are protected from bulk operations.
    #[arg(long)]
    pid1_mode: bool,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let args = Args::parse();

    // Determine paths based on mode
    let config_dir = args.config_dir.unwrap_or_else(|| {
        if args.pid1_mode {
            socket::system_config_dir()
        } else {
            socket::user_config_dir()
        }
    });

    let socket_path = args.socket.unwrap_or_else(|| {
        if args.pid1_mode {
            socket::system_path()
        } else {
            socket::user_path()
        }
    });

    // Initialize logging
    let filter =
        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new(&args.log_level));

    tracing_subscriber::fmt()
        .with_env_filter(filter)
        .with_target(false)
        .init();

    // Ensure parent directories exist
    if let Some(parent) = socket_path.parent()
        && !parent.exists()
    {
        std::fs::create_dir_all(parent)?;
    }
    if !config_dir.exists() {
        std::fs::create_dir_all(&config_dir)?;
    }

    tracing::info!(
        config_dir = %config_dir.display(),
        socket = %socket_path.display(),
        pid1_mode = args.pid1_mode,
        "starting zinit-server"
    );

    // Create the supervisor
    let mut supervisor =
        Supervisor::new(config_dir.clone(), socket_path.clone(), args.pid1_mode).await?;

    // Get shared references for IPC (these are internally locked)
    let graph = supervisor.graph();
    let log_buffers = supervisor.log_buffers();

    // Create command channel for IPC -> Supervisor communication
    let (command_tx, mut command_rx) = mpsc::channel::<IpcCommand>(64);

    // Start IPC server in background
    let ipc_socket_path = socket_path.clone();
    let ipc_config_dir = config_dir.clone();
    let ipc_command_tx = command_tx.clone();

    // Create xinet callbacks for service lifecycle management
    let xinet_graph = Arc::clone(&graph);
    let xinet_start_tx = command_tx.clone();
    let xinet_stop_tx = command_tx.clone();

    // Start service callback - sends command to supervisor
    let start_service: StartServiceFn = Arc::new(move |name: &str| {
        let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
        xinet_start_tx
            .blocking_send(IpcCommand::StartService {
                name: name.to_string(),
                response_tx,
            })
            .is_ok()
    });

    // Stop service callback
    let stop_service: StopServiceFn = Arc::new(move |name: &str| {
        let (response_tx, _response_rx) = tokio::sync::oneshot::channel();
        xinet_stop_tx
            .blocking_send(IpcCommand::StopService {
                name: name.to_string(),
                response_tx,
            })
            .is_ok()
    });

    // Check if service is running - reads graph state
    let is_running: IsRunningFn = Arc::new(move |name: &str| {
        // Use try_read to avoid blocking in async context
        match xinet_graph.try_read() {
            Ok(g) => {
                if let Some(id) = g.get_by_name(name)
                    && let Some(service) = g.get(id)
                {
                    return matches!(service.state, ServiceState::Running { .. });
                }
                false
            }
            Err(_) => false, // Lock contended, assume not running
        }
    });

    let xinet_manager = Some(Arc::new(XinetManager::new(
        start_service,
        stop_service,
        is_running,
    )));

    let ipc_handle = tokio::spawn(async move {
        if let Err(e) = ipc::run_ipc_server(
            &ipc_socket_path,
            ipc_config_dir,
            graph,
            log_buffers,
            ipc_command_tx,
            xinet_manager,
        )
        .await
        {
            tracing::error!(error = %e, "IPC server error");
        }
    });

    // Start syslog receiver if enabled (PID 1 mode)
    let syslog_handle = if syslog::should_enable_syslog(args.pid1_mode) {
        let syslog_graph = supervisor.graph();
        let syslog_log_buffers = supervisor.log_buffers();
        let syslog_path = syslog::default_socket_path();

        let receiver = SyslogReceiver::new(syslog_path, syslog_graph, syslog_log_buffers);

        Some(tokio::spawn(async move {
            if let Err(e) = receiver.run().await {
                tracing::error!(error = %e, "syslog receiver error");
            }
        }))
    } else {
        tracing::debug!("syslog receiver disabled (not in PID 1 mode)");
        None
    };

    // Create shutdown signal channel (with shutdown type)
    let (shutdown_tx, shutdown_rx) = oneshot::channel::<ShutdownType>();

    // Get the supervisor's event sender for spawning command handlers
    let event_tx = supervisor.event_tx();

    // Spawn command handler task
    let command_handle = tokio::spawn(async move {
        use zinit::server::supervisor::SupervisorEvent;

        while let Some(command) = command_rx.recv().await {
            match command {
                IpcCommand::Shutdown => {
                    tracing::info!("shutdown requested via IPC");
                    let _ = shutdown_tx.send(ShutdownType::Normal);
                    break;
                }
                IpcCommand::StartService { name, response_tx } => {
                    let _ = event_tx
                        .send(SupervisorEvent::StartService { name, response_tx })
                        .await;
                }
                IpcCommand::StopService { name, response_tx } => {
                    let _ = event_tx
                        .send(SupervisorEvent::StopService { name, response_tx })
                        .await;
                }
                IpcCommand::RestartService { name, response_tx } => {
                    let _ = event_tx
                        .send(SupervisorEvent::RestartService { name, response_tx })
                        .await;
                }
                IpcCommand::KillService {
                    name,
                    signal,
                    response_tx,
                } => {
                    let _ = event_tx
                        .send(SupervisorEvent::KillService {
                            name,
                            signal,
                            response_tx,
                        })
                        .await;
                }
                IpcCommand::AddService {
                    config,
                    persist: _,
                    response_tx,
                } => {
                    // Persistence is handled by IPC layer, just add to graph
                    let _ = event_tx
                        .send(SupervisorEvent::AddService {
                            config,
                            response_tx,
                        })
                        .await;
                }
                IpcCommand::RemoveService { name, response_tx } => {
                    let _ = event_tx
                        .send(SupervisorEvent::RemoveService { name, response_tx })
                        .await;
                }
                IpcCommand::Reload { response_tx } => {
                    let _ = event_tx.send(SupervisorEvent::Reload { response_tx }).await;
                }
                IpcCommand::PrepareRestart { response_tx } => {
                    tracing::info!("prepare_restart requested via IPC");
                    let _ = event_tx
                        .send(SupervisorEvent::PrepareRestart { response_tx })
                        .await;
                    // After prepare_restart, exit without stopping services
                    let _ = shutdown_tx.send(ShutdownType::PrepareRestart);
                    break;
                }
            }
        }
    });

    // Run supervisor until shutdown signal or error
    tokio::select! {
        result = supervisor.run() => {
            if let Err(e) = result {
                tracing::error!(error = %e, "supervisor error");
            }
        }
        shutdown_type = shutdown_rx => {
            match shutdown_type {
                Ok(ShutdownType::Normal) => {
                    tracing::info!("shutdown signal received, stopping all services");
                    supervisor.stop_all_services().await;
                }
                Ok(ShutdownType::PrepareRestart) => {
                    tracing::info!("prepare_restart complete, exiting without stopping services");
                    // Don't stop services - let them continue as orphans
                }
                Err(_) => {
                    tracing::warn!("shutdown channel closed unexpectedly");
                }
            }
        }
    }

    // Cleanup
    ipc_handle.abort();
    command_handle.abort();
    if let Some(handle) = syslog_handle {
        handle.abort();
    }

    // Remove socket file
    let _ = std::fs::remove_file(&socket_path);

    tracing::info!("zinit-server stopped");
    Ok(())
}