zinit 0.3.7

Process supervisor with dependency management
Documentation
//! Process spawning and health check execution.

use crate::sdk::{FailureReason, ServiceConfig, ServiceState};

use crate::server::graph::ServiceId;
use crate::server::log;
use crate::server::process::{self, SpawnResult, check_health};

use super::Supervisor;
use super::events::{StartAction, SupervisorEvent, TimeoutKind};

impl Supervisor {
    /// Try to start a service if possible.
    pub(crate) async fn try_start_service(&mut self, id: ServiceId) {
        // Collect all info needed under lock, then release it
        let action = {
            let mut graph = self.graph.write().await;

            let service = match graph.get(id) {
                Some(s) => s,
                None => return,
            };

            // Check desired status - if "stop" or "ignore", don't auto-start
            if !service.should_autostart() {
                tracing::trace!(service = %service.name, "skipping start: status != start");
                return;
            }

            // Check if we can attempt start
            if !service.state.can_attempt_start() {
                tracing::debug!(
                    service = %service.name,
                    state = %service.state,
                    "skipping start: state does not allow start attempts"
                );
                return;
            }

            // Check dependencies
            match graph.can_start(id) {
                Ok(()) => {
                    // Can start!
                    if service.is_target() {
                        // Targets go directly to Running
                        let name = service.name.clone();
                        graph.set_state(id, ServiceState::Running { pid: 0 });
                        let dependents = graph.dependents(id);
                        StartAction::TargetReady { name, dependents }
                    } else {
                        // Real service needs to be spawned
                        let config = service.service_config().cloned();
                        graph.set_state(id, ServiceState::Starting { pid: 0 });
                        StartAction::SpawnProcess { config }
                    }
                }
                Err(reason) => {
                    // Blocked
                    let name = service.name.clone();
                    graph.set_state(
                        id,
                        ServiceState::Blocked {
                            waiting_on: reason.waiting_on(),
                        },
                    );
                    StartAction::Blocked {
                        name,
                        reason: reason.to_string(),
                    }
                }
            }
        };
        // Lock released here

        // Now perform actions without holding the lock
        match action {
            StartAction::TargetReady { name, dependents } => {
                tracing::info!(service = %name, "target satisfied");
                self.queue_reevaluate(dependents).await;
            }
            StartAction::SpawnProcess { config } => {
                if let Some(config) = config {
                    // Check for builtin service
                    if let Some(builtin) = Self::is_builtin_service(&config) {
                        self.run_builtin_service(id, builtin, &config).await;
                    } else {
                        self.spawn_service(id, config).await;
                    }
                }
            }
            StartAction::Blocked { name, reason } => {
                tracing::debug!(service = %name, reason = %reason, "service blocked");
            }
        }
    }

    /// Queue services for re-evaluation.
    pub(crate) async fn queue_reevaluate(&self, service_ids: Vec<ServiceId>) {
        for id in service_ids {
            let _ = self
                .event_tx
                .send(SupervisorEvent::Reevaluate { service_id: id })
                .await;
        }
    }

    /// Spawn a service process.
    pub(crate) async fn spawn_service(&mut self, id: ServiceId, config: ServiceConfig) {
        let name = config.service.name.clone();

        // Initialize log buffer
        let buffer_lines = config.logging.buffer_lines;
        log::init_buffer(&self.log_buffers, id, buffer_lines).await;

        // Spawn the process
        match process::spawn_process(&config) {
            Ok(SpawnResult {
                child,
                stdout,
                stderr,
                pid,
            }) => {
                // Update state with actual PID and record start time
                {
                    let mut graph = self.graph.write().await;
                    if let Some(service) = graph.get_mut(id) {
                        service.record_started();
                        service.state = ServiceState::Starting { pid };
                    }
                }

                // Start log readers
                let log_buffers = self.log_buffers.clone();
                let name_clone = name.clone();
                tokio::spawn(async move {
                    log::read_stdout(id, name_clone, stdout, log_buffers, None).await;
                });

                let log_buffers = self.log_buffers.clone();
                let name_clone = name.clone();
                tokio::spawn(async move {
                    log::read_stderr(id, name_clone, stderr, log_buffers, None).await;
                });

                // Start process watcher
                let event_tx = self.event_tx.clone();
                let task = tokio::spawn(async move {
                    let (exit_code, signal) = process::wait_for_exit(child).await;
                    let _ = event_tx
                        .send(SupervisorEvent::ProcessExited {
                            service_id: id,
                            exit_code,
                            signal,
                        })
                        .await;
                });
                self.process_tasks.insert(id, task);

                // Schedule default start timeout (may be extended for health checks)
                self.schedule_timeout(id, TimeoutKind::Start, config.lifecycle.start_timeout_ms);

                // Check if service has health checks
                if let Some(ref health) = config.health {
                    // Extract health check timing parameters
                    let (start_period, interval, retries, timeout) = match health {
                        crate::sdk::HealthDef::Tcp { common, .. }
                        | crate::sdk::HealthDef::Http { common, .. }
                        | crate::sdk::HealthDef::Exec { common, .. } => (
                            common.start_period_ms,
                            common.interval_ms,
                            common.retries,
                            common.timeout_ms,
                        ),
                    };

                    // Auto-extend start_timeout to accommodate health checks
                    // Total time needed: start_period + (retries * (interval + timeout)) + buffer
                    let health_time_needed =
                        start_period + (retries as u64 * (interval + timeout)) + 5000;
                    let effective_start_timeout =
                        config.lifecycle.start_timeout_ms.max(health_time_needed);

                    if effective_start_timeout > config.lifecycle.start_timeout_ms {
                        tracing::debug!(
                            service = %name,
                            configured = config.lifecycle.start_timeout_ms,
                            effective = effective_start_timeout,
                            "auto-extended start_timeout to accommodate health checks"
                        );
                    }

                    self.schedule_timeout(id, TimeoutKind::Start, effective_start_timeout);

                    if start_period > 0 {
                        self.schedule_timeout(id, TimeoutKind::HealthCheck, start_period);
                    } else {
                        // Run health check immediately
                        self.run_health_check(id, health.clone());
                    }
                } else {
                    // No health check - transition directly to Running
                    let dependents = {
                        let mut graph = self.graph.write().await;
                        if let Some(service) = graph.get_mut(id) {
                            service.state = ServiceState::Running { pid };
                            // Note: Don't reset backoff here - it will be reset when the
                            // service has been running long enough (stability period).
                            // This happens when the service exits/stops via try_reset_backoff().
                        }
                        graph.dependents(id)
                    };

                    self.cancel_timeout(id, TimeoutKind::Start);
                    self.queue_reevaluate(dependents).await;
                }

                tracing::info!(service = %name, pid = pid, "service started");
            }
            Err(e) => {
                tracing::error!(service = %name, error = %e, "failed to spawn service");

                let mut graph = self.graph.write().await;
                graph.set_state(
                    id,
                    ServiceState::Failed {
                        reason: FailureReason::SpawnError {
                            message: e.to_string(),
                        },
                    },
                );
            }
        }
    }

    /// Run a health check for a service.
    pub(crate) fn run_health_check(&self, id: ServiceId, health: crate::sdk::HealthDef) {
        let event_tx = self.event_tx.clone();

        tokio::spawn(async move {
            let result = check_health(&health).await;
            let _ = event_tx
                .send(SupervisorEvent::HealthCheckResult {
                    service_id: id,
                    passed: result.is_ok(),
                    error: result.err().map(|e| e.to_string()),
                })
                .await;
        });
    }
}