zinit 0.3.8

Process supervisor with dependency management
Documentation
//! Public API methods for IPC.

use std::collections::HashMap;
use std::time::Duration;

use crate::sdk::{LogLine, ReloadResult, ServiceConfig, ServiceState, ServiceStatus, WhyBlocked};

use crate::server::error::{SupervisorError, SupervisorResult};
use crate::server::graph::{Service, ServiceId};
use crate::server::log;
use crate::server::process::{parse_signal, send_signal_to_group};

use super::SYSTEM_CONFIG_DIR;
use super::Supervisor;
use super::events::TimeoutKind;

impl Supervisor {
    /// List all services - returns just names.
    pub async fn list_services(&self) -> Vec<String> {
        let graph = self.graph.read().await;
        graph
            .all_services()
            .filter_map(|id| graph.get(id).map(|s| s.name.clone()))
            .collect()
    }

    /// Get service status (simplified).
    pub async fn get_status(&self, name: &str) -> SupervisorResult<ServiceStatus> {
        let graph = self.graph.read().await;
        let id = graph
            .get_by_name(name)
            .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

        let service = graph.get(id).unwrap();
        Ok(ServiceStatus::from_state(
            service.name.clone(),
            &service.state,
        ))
    }

    /// Start a service.
    pub async fn start_service(&mut self, name: &str) -> SupervisorResult<()> {
        let id = {
            let graph = self.graph.read().await;
            graph
                .get_by_name(name)
                .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?
        };

        self.try_start_service(id).await;
        Ok(())
    }

    /// Stop a service.
    /// This cascades to dependents: all services that depend on this one
    /// are stopped first (in reverse topological order), then this service is stopped.
    /// Dependencies (services this one depends on) are NOT stopped automatically.
    pub async fn stop_service(&mut self, name: &str) -> SupervisorResult<()> {
        // First, get all running dependents that need to be stopped
        let dependents_to_stop: Vec<String> = {
            let graph = self.graph.read().await;
            let id = graph
                .get_by_name(name)
                .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

            // Get all transitive dependents in order (most dependent first)
            graph
                .all_dependents_ordered(id)
                .into_iter()
                .filter_map(|dep_id| {
                    let dep = graph.get(dep_id)?;
                    // Only include running services
                    if dep.state.is_active() || dep.state.pid().is_some() {
                        Some(dep.name.clone())
                    } else {
                        None
                    }
                })
                .collect()
        };

        // Stop all running dependents first (cascade)
        if !dependents_to_stop.is_empty() {
            tracing::info!(
                service = %name,
                dependents = ?dependents_to_stop,
                "stopping dependents first"
            );

            for dep_name in dependents_to_stop {
                // Recursively stop each dependent (this handles their dependents too)
                if let Err(e) = self.stop_single_service(&dep_name).await {
                    tracing::warn!(
                        service = %dep_name,
                        error = %e,
                        "failed to stop dependent service"
                    );
                }
            }
        }

        // Now stop the requested service
        self.stop_single_service(name).await
    }

    /// Internal helper: stop a single service without cascade logic.
    pub(crate) async fn stop_single_service(&mut self, name: &str) -> SupervisorResult<()> {
        let (id, pid, stop_signal, stop_timeout) = {
            let graph = self.graph.read().await;
            let id = graph
                .get_by_name(name)
                .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

            let service = graph.get(id).unwrap();

            // Check if service is already stopped or stopping
            if matches!(
                service.state,
                ServiceState::Inactive
                    | ServiceState::Exited { .. }
                    | ServiceState::Stopping { .. }
            ) {
                tracing::debug!(service = %name, state = %service.state.name(), "service already stopped or stopping");
                return Ok(());
            }

            // Targets (pid=0) don't need signal, just mark inactive
            let pid = service.state.pid();
            if pid == Some(0) || pid.is_none() {
                drop(graph);
                let mut graph = self.graph.write().await;
                graph.set_state(id, ServiceState::Inactive);
                tracing::info!(service = %name, "target/service marked inactive");
                return Ok(());
            }

            let pid = pid.unwrap();

            let stop_signal = service
                .service_config()
                .map(|c| c.lifecycle.stop_signal.clone())
                .unwrap_or_else(|| "SIGTERM".to_string());

            let stop_timeout = service
                .service_config()
                .map(|c| c.lifecycle.stop_timeout_ms)
                .unwrap_or(10000);

            (id, pid, stop_signal, stop_timeout)
        };

        let sig = parse_signal(&stop_signal)?;
        send_signal_to_group(pid, sig)?;

        {
            let mut graph = self.graph.write().await;
            graph.set_state(id, ServiceState::Stopping { pid });
        }

        self.schedule_timeout(id, TimeoutKind::Stop, stop_timeout);

        tracing::info!(service = %name, signal = %stop_signal, pgid = pid, "stopping service (process group)");
        Ok(())
    }

    /// Restart a service.
    pub async fn restart_service(&mut self, name: &str) -> SupervisorResult<()> {
        self.stop_service(name).await?;
        Ok(())
    }

    /// Send a signal to a service.
    pub async fn kill_service(&mut self, name: &str, signal: Option<&str>) -> SupervisorResult<()> {
        let graph = self.graph.read().await;
        let id = graph
            .get_by_name(name)
            .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

        let service = graph.get(id).unwrap();
        let pid = service
            .state
            .pid()
            .ok_or_else(|| SupervisorError::ServiceNotRunning(name.to_string()))?;

        let sig = parse_signal(signal.unwrap_or("SIGTERM"))?;
        send_signal_to_group(pid, sig)?;

        tracing::info!(service = %name, signal = ?sig, pgid = pid, "sent signal to process group");
        Ok(())
    }

    /// Get why a service is blocked.
    pub async fn why_blocked(&self, name: &str) -> SupervisorResult<WhyBlocked> {
        let graph = self.graph.read().await;
        let id = graph
            .get_by_name(name)
            .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

        let service = graph.get(id).unwrap();
        let blocked = matches!(service.state, ServiceState::Blocked { .. });

        let (waiting_on, conflicts_with) = match graph.can_start(id) {
            Ok(()) => (vec![], vec![]),
            Err(reason) => (reason.waiting_on(), reason.conflicts_with()),
        };

        let ascii = graph
            .format_why_blocked(name)
            .unwrap_or_else(|| "Unknown service".to_string());

        Ok(WhyBlocked {
            name: name.to_string(),
            blocked,
            waiting_on,
            conflicts_with,
            ascii,
        })
    }

    /// Get the dependency tree.
    pub async fn get_tree(&self) -> String {
        let graph = self.graph.read().await;
        graph.format_tree()
    }

    /// Add a new service.
    pub async fn add_service(&mut self, config: ServiceConfig) -> SupervisorResult<()> {
        let errors = crate::sdk::validate::validate_service(&config);
        if !errors.is_empty() {
            return Err(SupervisorError::Validation(errors.join(", ")));
        }

        let (id, should_autostart) = {
            let mut graph = self.graph.write().await;
            let service = Service::from_service(config);
            let autostart = service.should_autostart();
            let id = graph.add_service(service)?;
            (id, autostart)
        };

        // Only try to start if status is "start" (the default)
        // If status is "stop" or "ignore", don't auto-start
        if should_autostart {
            self.try_start_service(id).await;
        } else {
            tracing::debug!(service_id = ?id, "service added with status != start, not auto-starting");
        }

        Ok(())
    }

    /// Remove a service.
    pub async fn remove_service(&mut self, name: &str) -> SupervisorResult<()> {
        // Check if running and stop if needed
        let is_active = {
            let graph = self.graph.read().await;
            graph
                .get_by_name(name)
                .and_then(|id| graph.get(id))
                .is_some_and(|service| service.state.is_active())
        };

        if is_active {
            self.stop_service(name).await?;
            tokio::time::sleep(Duration::from_millis(100)).await;
        }

        let mut graph = self.graph.write().await;
        graph.remove_service(name)?;
        Ok(())
    }

    /// Reload configuration.
    pub async fn reload(&mut self) -> SupervisorResult<ReloadResult> {
        let mut graph = self.graph.write().await;

        // Build old_id -> name mapping before reload
        let old_id_to_name: HashMap<ServiceId, String> = graph
            .all_services()
            .filter_map(|id| graph.get(id).map(|s| (id, s.name.clone())))
            .collect();

        let result = if self.pid1_mode {
            let system_dir = std::path::Path::new(SYSTEM_CONFIG_DIR);
            let (result, new_system_names) =
                graph.reload_from_directories(Some(system_dir), &self.config_dir)?;
            self.system_service_names = new_system_names;
            result
        } else {
            graph.reload_from_directory(&self.config_dir)?
        };

        // Build name -> new_id mapping after reload
        let name_to_new_id: HashMap<String, ServiceId> = graph
            .all_services()
            .filter_map(|id| graph.get(id).map(|s| (s.name.clone(), id)))
            .collect();

        // Release lock before remapping
        drop(graph);

        // Remap all ServiceId-keyed maps
        self.remap_service_ids(&old_id_to_name, &name_to_new_id)
            .await;

        tracing::info!(
            added = result.added.len(),
            removed = result.removed.len(),
            changed = result.changed.len(),
            pid1_mode = self.pid1_mode,
            "configuration reloaded"
        );

        Ok(result)
    }

    /// Get logs for a service.
    pub async fn get_logs(
        &self,
        name: &str,
        lines: Option<usize>,
    ) -> SupervisorResult<Vec<LogLine>> {
        let graph = self.graph.read().await;
        let id = graph
            .get_by_name(name)
            .ok_or_else(|| SupervisorError::ServiceNotFound(name.to_string()))?;

        drop(graph);

        Ok(log::get_logs(&self.log_buffers, id, lines).await)
    }
}