zinit 0.3.6

Process supervisor with dependency management
Documentation
//! State persistence and shutdown handling.

use std::collections::HashMap;
use std::time::Duration;

use crate::sdk::{PrepareRestartResult, ServiceState};
use nix::sys::signal::Signal;

use crate::server::graph::ServiceId;
use crate::server::process::{process_exists, send_signal_to_group};
use crate::server::state::{
    PersistedService, PersistedServiceState, PersistedState, STATE_VERSION, now_millis, save_state,
};

use super::Supervisor;

impl Supervisor {
    /// Remap ServiceIds in internal maps after graph reload.
    pub(crate) async fn remap_service_ids(
        &mut self,
        old_id_to_name: &HashMap<ServiceId, String>,
        name_to_new_id: &HashMap<String, ServiceId>,
    ) {
        // Helper to remap an old ID to new ID
        let remap = |old_id: ServiceId| -> Option<ServiceId> {
            old_id_to_name
                .get(&old_id)
                .and_then(|name| name_to_new_id.get(name))
                .copied()
        };

        // Remap process_tasks
        let old_process_tasks = std::mem::take(&mut self.process_tasks);
        for (old_id, handle) in old_process_tasks {
            if let Some(new_id) = remap(old_id) {
                self.process_tasks.insert(new_id, handle);
            } else {
                // Service was removed - abort the task
                handle.abort();
            }
        }

        // Remap timers
        let old_timers = std::mem::take(&mut self.timers);
        for ((old_id, kind), handle) in old_timers {
            if let Some(new_id) = remap(old_id) {
                self.timers.insert((new_id, kind), handle);
            } else {
                // Service was removed - abort the timer
                handle.abort();
            }
        }

        // Remap health_attempts
        let old_health = std::mem::take(&mut self.health_attempts);
        for (old_id, count) in old_health {
            if let Some(new_id) = remap(old_id) {
                self.health_attempts.insert(new_id, count);
            }
            // If service was removed, just drop the count
        }

        // Remap pending_restarts
        let old_pending = std::mem::take(&mut self.pending_restarts);
        for old_id in old_pending {
            if let Some(new_id) = remap(old_id) {
                self.pending_restarts.insert(new_id);
            }
            // If service was removed, just drop from pending
        }

        // Remap log_buffers (shared with IPC handlers)
        {
            let mut buffers = self.log_buffers.write().await;
            let old_buffers = std::mem::take(&mut *buffers);
            for (old_id, buffer) in old_buffers {
                if let Some(new_id) = remap(old_id) {
                    buffers.insert(new_id, buffer);
                }
                // If service was removed, just drop the buffer
            }
        }
    }

    /// Stop all running services for shutdown.
    /// Uses reverse topological order: dependents are stopped before their dependencies.
    pub async fn stop_all_services(&mut self) {
        // Get services in shutdown order (reverse topological - dependents first)
        let shutdown_order: Vec<(ServiceId, String, Option<u32>)> = {
            let graph = self.graph.read().await;
            graph
                .shutdown_order()
                .into_iter()
                .filter_map(|id| {
                    let svc = graph.get(id)?;
                    // Include all active services, not just those with PIDs (targets have pid=0)
                    if svc.state.is_active() || svc.state.pid().is_some() {
                        Some((id, svc.name.clone(), svc.state.pid()))
                    } else {
                        None
                    }
                })
                .collect()
        };

        if shutdown_order.is_empty() {
            tracing::info!("no running services to stop");
            return;
        }

        tracing::info!(
            count = shutdown_order.len(),
            "stopping all services in dependency order"
        );

        // Stop services one by one in shutdown order
        for (id, name, pid_opt) in &shutdown_order {
            if let Some(pid) = pid_opt {
                if *pid == 0 {
                    // Target (virtual service) - just mark as inactive
                    tracing::info!(service = %name, "marking target as inactive");
                    let mut graph = self.graph.write().await;
                    graph.set_state(*id, ServiceState::Inactive);
                    continue;
                }

                tracing::info!(service = %name, pgid = pid, "stopping service (SIGTERM)");
                let _ = send_signal_to_group(*pid, Signal::SIGTERM);

                // Update state
                {
                    let mut graph = self.graph.write().await;
                    graph.set_state(*id, ServiceState::Stopping { pid: *pid });
                }

                // Wait for this service to stop (up to 5 seconds)
                let mut stopped = false;
                for _ in 0..50 {
                    tokio::time::sleep(Duration::from_millis(100)).await;
                    if !process_exists(*pid) {
                        stopped = true;
                        break;
                    }
                }

                if !stopped {
                    tracing::warn!(service = %name, pgid = pid, "stop timeout, sending SIGKILL");
                    let _ = send_signal_to_group(*pid, Signal::SIGKILL);
                    // Brief wait for SIGKILL
                    tokio::time::sleep(Duration::from_millis(100)).await;
                }

                // Update state to exited
                {
                    let mut graph = self.graph.write().await;
                    graph.set_state(*id, ServiceState::Exited { exit_code: None });
                }
            } else {
                // No PID - mark as inactive
                let mut graph = self.graph.write().await;
                graph.set_state(*id, ServiceState::Inactive);
            }
        }

        tracing::info!("all services stopped");
    }

    /// Serialize current state for restart.
    pub async fn serialize_state(&self) -> PersistedState {
        let graph = self.graph.read().await;

        let services = graph
            .all_services()
            .filter_map(|id| {
                let svc = graph.get(id)?;
                let persisted = PersistedService {
                    name: svc.name.clone(),
                    state: PersistedServiceState::from(&svc.state),
                    pid: svc.state.pid(),
                    restart_count: svc.restart_count,
                    current_restart_delay_ms: svc.current_restart_delay_ms,
                    last_exit_code: svc.last_exit_code,
                    last_exit_signal: svc.last_exit_signal,
                    started_at: svc.started_at,
                    last_state_change: svc.last_state_change,
                    ephemeral: svc.ephemeral,
                    config: if svc.ephemeral {
                        svc.service_config().cloned()
                    } else {
                        None
                    },
                };
                Some((svc.name.clone(), persisted))
            })
            .collect();

        PersistedState {
            version: STATE_VERSION,
            saved_at: now_millis(),
            boot_time: self.boot_time,
            services,
            config_dir: self.config_dir.display().to_string(),
            socket_path: self.socket_path.display().to_string(),
        }
    }

    /// Write state to disk for restart.
    pub async fn save_state_to_disk(&self) -> Result<(), std::io::Error> {
        let state = self.serialize_state().await;
        save_state(&state)?;
        tracing::info!(
            services = state.services.len(),
            path = crate::server::state::STATE_PATH,
            "state saved for restart"
        );
        Ok(())
    }

    /// Prepare for restart - save state and return result.
    pub async fn prepare_restart(&self) -> Result<PrepareRestartResult, String> {
        // Save state to disk
        self.save_state_to_disk()
            .await
            .map_err(|e| format!("Failed to save state: {}", e))?;

        Ok(PrepareRestartResult {
            state_path: crate::server::state::STATE_PATH.to_string(),
            ready: true,
        })
    }
}