# Zinit Server - Technical Design Document
**Version:** 2.1
**Date:** 2026-01-21
**Status:** Implementation Reference (reflects current codebase)
---
## Table of Contents
1. [Overview](#1-overview)
2. [Architecture](#2-architecture)
3. [Concurrency Model](#3-concurrency-model)
4. [Service State Machine](#4-service-state-machine)
5. [Dependency Graph](#5-dependency-graph)
6. [Configuration](#6-configuration)
7. [Process Management](#7-process-management)
8. [Log Management](#8-log-management)
9. [IPC Protocol](#9-ipc-protocol)
10. [Implementation Details](#10-implementation-details)
11. [Shared Crate: zinit-common](#11-shared-crate-zinit-common)
12. [Project Structure](#12-project-structure)
13. [Crate Dependencies](#13-crate-dependencies)
14. [zinit-pid1 Specification](#14-zinit-pid1-specification)
15. [Xinet Socket Activation](#15-xinet-socket-activation)
16. [State Persistence](#16-state-persistence)
---
## 1. Overview
### 1.1 What We're Building
zinit-server is a process supervisor with dependency management. It:
- Manages service lifecycle (start, stop, restart)
- Enforces dependency ordering and requirements
- Tracks service states with full visibility into blocking reasons
- Captures and buffers logs
- Exposes control via Unix socket RPC
### 1.2 Why Rewrite
The current implementation has:
- Race conditions in async state management
- Broken dependency ordering (`after` not honored)
- State detection issues (services shown blocked while running)
- No visibility into why services are blocked
### 1.3 Design Principles
1. **Explicit state machine** - all states and transitions defined, testable
2. **Validate before mutate** - graph changes validated on copy, then swapped
3. **Hybrid concurrency** - locks for request/response, channels for async events
4. **Visibility** - always know *why* something is blocked or failed
---
## 2. Architecture
### 2.1 System Context
```
┌─────────────────────────────────────────────────────────────────┐
│ zinit-pid1 │
│ - Spawns/monitors zinit-server │
│ - Forwards signals (SIGTERM→shutdown, SIGUSR1→soft restart) │
│ - Reaps orphan zombies │
└─────────────────────────────────────────────────────────────────┘
│
│ spawns / signals
▼
┌─────────────────────────────────────────────────────────────────┐
│ zinit-server │
│ (this document) │
└─────────────────────────────────────────────────────────────────┘
│
│ unix socket
▼
┌─────────────────────────────────────────────────────────────────┐
│ zinit-client │
│ (CLI / TUI / Rhai) │
└─────────────────────────────────────────────────────────────────┘
```
### 2.2 Internal Components
```
┌─────────────────────────────────────────────────────────────────────┐
│ Supervisor │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ graph: Arc<RwLock<ServiceGraph>> │ │
│ │ - topology (static after load/reload) │ │
│ │ - service configs │ │
│ │ - service states (dynamic) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ log_buffers: Arc<RwLock<HashMap<ServiceId, LogBuffer>>> │ │
│ │ - separate lock from graph (no contention) │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ event_rx: mpsc::Receiver<SupervisorEvent> │ │
│ │ - receives async events from background tasks │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
▲ ▲ ▲ ▲
│ │ │ │
via channel via channel via channel via channel
│ │ │ │
┌────────┴───┐ ┌───────┴───┐ ┌───────┴───┐ ┌──────┴─────┐
│ Process │ │ Log │ │ Timer │ │ Health │
│ Monitors │ │ Readers │ │ Tasks │ │ Checkers │
└────────────┘ └───────────┘ └───────────┘ └────────────┘
```
---
## 3. Concurrency Model
### 3.1 Hybrid Approach
Use **locks for synchronous request/response**, **channels for async events**.
| RPC queries (list, status, why, tree) | `graph.read()` | Fast, allows concurrent reads |
| RPC mutations (start, stop, add) | `graph.write()` | Atomic check + update |
| Log buffer access | `log_buffers.read/write()` | Separate lock, no graph contention |
| Process exit notification | channel → event loop | Can't hold lock while waiting on child |
| Health check results | channel → event loop | Runs async, reports when done |
| Timeout events | channel → event loop | Timer fires independently |
| Log shipping | `try_send` to shipper task | Fire and forget, never blocks |
### 3.2 Core Types
```rust
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use std::collections::{HashMap, HashSet};
pub struct Supervisor {
/// Dependency graph with service states
pub(crate) graph: Arc<RwLock<ServiceGraph>>,
/// Per-service log ring buffers (separate lock = no contention with graph)
pub(crate) log_buffers: LogBuffers, // Arc<RwLock<HashMap<ServiceId, LogBuffer>>>
/// Send events to supervisor event loop
pub(crate) event_tx: mpsc::Sender<SupervisorEvent>,
/// Receive events from background tasks
event_rx: mpsc::Receiver<SupervisorEvent>,
/// Send logs to shipping task (optional, for vector.io)
_log_shipper_tx: Option<mpsc::Sender<LogLine>>,
/// Active timers, for cancellation
pub(crate) timers: HashMap<(ServiceId, TimeoutKind), JoinHandle<()>>,
/// Process monitor tasks (for cleanup on stop)
pub(crate) process_tasks: HashMap<ServiceId, JoinHandle<()>>,
/// Health check attempt counters
pub(crate) health_attempts: HashMap<ServiceId, u32>,
/// Services pending restart (for deduplication)
pub(crate) pending_restarts: HashSet<ServiceId>,
/// Config directory path
pub(crate) config_dir: PathBuf,
/// Socket path
socket_path: PathBuf,
/// Boot time (preserved across soft restarts)
pub(crate) boot_time: u64,
/// Shutdown flag
shutdown: bool,
/// Running as PID 1 (affects shutdown behavior)
pub(crate) pid1_mode: bool,
/// System service names (protected from bulk operations)
pub(crate) system_service_names: HashSet<String>,
}
/// Events from background tasks TO supervisor
#[derive(Debug)]
pub enum SupervisorEvent {
// Process lifecycle events
ProcessExited {
service_id: ServiceId,
exit_code: Option<i32>,
signal: Option<i32>,
},
HealthCheckResult {
service_id: ServiceId,
passed: bool,
error: Option<String>,
},
Timeout {
service_id: ServiceId,
kind: TimeoutKind,
},
BuiltinCompleted {
service_id: ServiceId,
success: bool,
error: Option<String>,
},
// Dependency re-evaluation
Reevaluate {
service_id: ServiceId,
},
// RPC command events (sent from IPC handlers)
StartService { name: String },
StopService { name: String },
RestartService { name: String },
KillService { name: String, signal: String },
AddService { config: ServiceConfig },
RemoveService { name: String },
Reload { response_tx: oneshot::Sender<Result<ReloadResult, Error>> },
PrepareRestart { response_tx: oneshot::Sender<Result<PrepareRestartResult, Error>> },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TimeoutKind {
Start,
Stop,
HealthCheck,
RestartDelay,
}
```
### 3.3 TOCTOU Prevention
**Time Of Check To Time Of Use** race condition - prevent by holding write lock for entire transition:
```rust
// WRONG - race condition
fn start_service_wrong(&self, name: &str) {
let state = self.graph.read().get_state(name); // read lock released
if state == Inactive {
// another task could start it here!
self.graph.write().set_state(name, Starting); // oops, double start
}
}
// CORRECT - atomic check + update
fn start_service_correct(&self, name: &str) -> Result<(), Error> {
let mut graph = self.graph.write(); // exclusive access
let id = graph.get_by_name(name)?;
// check and update under same lock
match graph.get_state(id) {
ServiceState::Inactive | ServiceState::Exited { .. } | ServiceState::Failed { .. } => {
match graph.can_start(id) {
Ok(()) => {
graph.set_state(id, ServiceState::Starting {
pid: 0, // updated after spawn
started_at: Instant::now()
});
let config = graph.get(id).config.clone();
drop(graph); // release lock before spawning
self.spawn_process(id, config);
Ok(())
}
Err(blocked_reason) => {
graph.set_state(id, ServiceState::Blocked {
waiting_on: blocked_reason.waiting_on(),
conflicts_with: blocked_reason.conflicts_with(),
});
Ok(()) // blocked is not an error
}
}
}
ServiceState::Running { .. } => {
Err(Error::AlreadyRunning)
}
ServiceState::Starting { .. } | ServiceState::Stopping { .. } => {
Err(Error::TransitionInProgress)
}
ServiceState::Blocked { .. } => {
// already waiting, maybe re-check deps?
Ok(())
}
}
}
```
### 3.4 Lock Granularity
Keep locks brief. Release before I/O:
```rust
async fn handle_start(&self, name: &str) -> Result<(), Error> {
// 1. Take write lock, validate, update state, get config
let (service_id, config) = {
let mut graph = self.graph.write();
let id = graph.get_by_name(name)?;
// ... validation and state transition ...
graph.set_state(id, ServiceState::Starting { ... });
(id, graph.get(id).config.clone())
}; // lock released here
// 2. Spawn process (I/O) - no lock held
self.spawn_process(service_id, config).await;
Ok(())
}
```
---
## 4. Service State Machine
### 4.1 States
```
┌──────────────────────────────────┐
│ │
▼ │
┌──────────┐ ┌─────────┐ ┌──────────┐ ┌─────────┐ │
│ Inactive │───▶│ Blocked │───▶│ Starting │───▶│ Running │──┤
└──────────┘ └─────────┘ └──────────┘ └─────────┘ │
▲ │ │ │ │
│ │ ▼ ▼ │
│ │ ┌─────────┐ ┌──────────┐ │
│ └────────▶│ Failed │◀──│ Stopping │ │
│ └─────────┘ └──────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────┐ │
└─────────────────────────│ Exited │◀─┘
└─────────────────────────┘
```
### 4.2 State Definitions
```rust
/// Service state - timestamps tracked separately in Service struct
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "state", rename_all = "snake_case")]
pub enum ServiceState {
/// Configured but never started
Inactive,
/// Waiting on dependencies
Blocked {
waiting_on: Vec<String>, // Service names, not IDs (for serialization)
},
/// Process spawned, waiting for ready signal (health check or immediate)
Starting { pid: u32 },
/// Process running and healthy
Running { pid: u32 },
/// SIGTERM sent, waiting for exit
Stopping { pid: u32 },
/// Process exited (clean or via stop request)
Exited { exit_code: Option<i32> },
/// Process failed
Failed { reason: FailureReason },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FailureReason {
ExitCode { code: i32 },
Signal { signal: i32 },
StartTimeout,
StopTimeout, // had to SIGKILL
HealthCheckFailed { attempts: u32 },
DependencyFailed { service: String },
SpawnError { message: String },
/// Configuration error - a required dependency doesn't exist
MissingDependency { dependency: String },
}
impl ServiceState {
/// For display in CLI
pub fn name(&self) -> &'static str {
match self {
Self::Inactive => "inactive",
Self::Blocked { .. } => "blocked",
Self::Starting { .. } => "starting",
Self::Running { .. } => "running",
Self::Stopping { .. } => "stopping",
Self::Exited { .. } => "exited",
Self::Failed { .. } => "failed",
}
}
/// For ASCII tree display
pub fn symbol(&self) -> &'static str {
match self {
Self::Inactive => "[-]",
Self::Blocked { .. } => "[?]",
Self::Starting { .. } => "[>]",
Self::Running { .. } => "[+]",
Self::Stopping { .. } => "[!]",
Self::Exited { .. } => "[.]",
Self::Failed { .. } => "[X]",
}
}
/// Can this state satisfy a "requires" dependency?
/// Running OR exited successfully (exit code 0) - supports oneshot services.
pub fn is_satisfied(&self) -> bool {
matches!(self, Self::Running { .. } | Self::Exited { exit_code: Some(0) })
}
/// Is a process currently running?
pub fn is_active(&self) -> bool {
matches!(self, Self::Starting { .. } | Self::Running { .. } | Self::Stopping { .. })
}
/// Can we attempt to start from this state?
pub fn can_attempt_start(&self) -> bool {
matches!(self, Self::Inactive | Self::Exited { .. } | Self::Failed { .. })
}
/// Get PID if process is running
pub fn pid(&self) -> Option<u32> {
match self {
Self::Starting { pid, .. } => Some(*pid),
Self::Running { pid, .. } => Some(*pid),
Self::Stopping { pid, .. } => Some(*pid),
_ => None,
}
}
}
```
### 4.3 Targets (Virtual Services)
Targets have no process - they're dependency anchors:
```rust
#[derive(Debug, Clone)]
pub struct Service {
pub name: String,
pub config: ServiceConfigKind, // Either ServiceConfig or TargetConfig
pub state: ServiceState,
// Restart tracking (exponential backoff)
pub restart_count: u32,
pub current_restart_delay_ms: u64,
// Timing information (milliseconds since boot)
pub started_at: Option<u64>,
pub last_state_change: u64,
// Exit history (for diagnostics)
pub last_exit_code: Option<i32>,
pub last_exit_signal: Option<i32>,
// Runtime-added services (not from config files)
pub ephemeral: bool,
}
/// Configuration can be either a service or a target
pub enum ServiceConfigKind {
Service(ServiceConfig),
Target(TargetConfig),
}
impl Service {
pub fn is_target(&self) -> bool {
matches!(self.config, ServiceConfigKind::Target(_))
}
}
impl Service {
pub fn new(name: String, config: ServiceConfig, is_target: bool) -> Self {
let initial_delay = config.lifecycle.restart_delay_ms;
Self {
name,
config,
state: ServiceState::Inactive,
is_target,
restart_count: 0,
current_restart_delay_ms: initial_delay,
}
}
/// Targets transition directly to Running when deps satisfied
pub fn target_check_ready(&mut self, graph: &ServiceGraph) {
if !self.is_target {
return;
}
// If all requires are satisfied, target is "running"
if graph.all_requires_satisfied(self.id) {
self.state = ServiceState::Running {
pid: 0, // no actual process
ready_at: Instant::now(),
};
}
}
/// Get next restart delay with exponential backoff.
/// Returns None if max_restarts exceeded or policy says no restart.
pub fn next_restart_delay(&mut self) -> Option<Duration> {
if !self.should_restart() {
return None;
}
let delay = self.current_restart_delay_ms;
// Exponential backoff: double for next time, capped at max
self.current_restart_delay_ms = (self.current_restart_delay_ms * 2)
.min(self.config.lifecycle.restart_delay_max_ms);
self.restart_count += 1;
Some(Duration::from_millis(delay))
}
/// Reset backoff when service becomes healthy (reaches Running state)
pub fn reset_backoff(&mut self) {
self.restart_count = 0;
self.current_restart_delay_ms = self.config.lifecycle.restart_delay_ms;
}
/// Check if service should be restarted based on policy and limits
fn should_restart(&self) -> bool {
// Check policy first
let dominated = match (&self.state, self.config.lifecycle.restart) {
(ServiceState::Exited { exit_code: Some(0), .. }, RestartPolicy::OnFailure) => false,
(_, RestartPolicy::Never) => false,
(ServiceState::Exited { .. }, RestartPolicy::Always) => true,
(ServiceState::Exited { .. }, RestartPolicy::OnFailure) => true,
(ServiceState::Failed { .. }, RestartPolicy::Always) => true,
(ServiceState::Failed { .. }, RestartPolicy::OnFailure) => true,
_ => false,
};
if !dominated {
return false;
}
// Check max restarts (0 = unlimited)
let max = self.config.lifecycle.max_restarts;
if max > 0 && self.restart_count >= max {
return false;
}
true
}
}
```
**Exponential backoff behavior:**
With defaults: `restart_delay_ms: 1000`, `restart_delay_max_ms: 300000`, `max_restarts: 10`
```
Crash #1 → wait 1s
Crash #2 → wait 2s
Crash #3 → wait 4s
Crash #4 → wait 8s
Crash #5 → wait 16s
Crash #6 → wait 32s
Crash #7 → wait 64s
Crash #8 → wait 128s (~2 min)
Crash #9 → wait 256s (~4 min)
Crash #10 → wait 300s (capped at 5 min)
Crash #11 → give up, stay Failed
```
Total time before giving up: ~13 minutes. When service reaches Running state, `reset_backoff()` is called and counters reset.
---
## 5. Dependency Graph
### 5.1 Dependency Types
```rust
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DepType {
/// Ordering only: start this after dependency has STARTED (not necessarily ready)
/// Does not fail if dependency fails.
After,
/// Hard dependency: dependency must be RUNNING (satisfied).
/// Cannot start if dependency not running.
/// If dependency fails while we're running, we may be stopped (configurable).
Requires,
/// Soft dependency: try to start, but ok if fails.
Wants,
/// Mutual exclusion: cannot run at same time.
/// If starting this, conflicting services must stop first.
Conflicts,
}
```
### 5.2 Semantics
| `after` | Waits for dep to START | No | Ordering preference |
| `requires` | Waits for dep to be RUNNING | Can be configured | Hard dependency |
| `wants` | No | No | Nice-to-have |
| `conflicts` | Waits for dep to STOP | N/A | Mutual exclusion |
### 5.3 Graph Structure
```rust
use petgraph::graph::{DiGraph, NodeIndex};
use std::collections::HashMap;
pub type ServiceId = NodeIndex;
pub struct ServiceGraph {
/// Edge direction: dependency → dependent
/// "sshd requires network" = edge from network to sshd
graph: DiGraph<Service, DepType>,
/// Name → ID lookup
by_name: HashMap<String, ServiceId>,
}
impl ServiceGraph {
pub fn new() -> Self {
Self {
graph: DiGraph::new(),
by_name: HashMap::new(),
}
}
// ─────────────────────────────────────────────────────────────
// Construction
// ─────────────────────────────────────────────────────────────
pub fn add_service(&mut self, service: Service) -> ServiceId {
let name = service.name.clone();
let id = self.graph.add_node(service);
self.by_name.insert(name, id);
id
}
pub fn add_dependency(
&mut self,
dependent: &str, // the service that HAS the dependency
dependency: &str, // the service it depends ON
dep_type: DepType,
) -> Result<(), GraphError> {
let dependent_id = self.by_name.get(dependent)
.ok_or_else(|| GraphError::UnknownService(dependent.to_string()))?;
let dependency_id = self.by_name.get(dependency)
.ok_or_else(|| GraphError::UnknownService(dependency.to_string()))?;
// Edge goes FROM dependency TO dependent
self.graph.add_edge(*dependency_id, *dependent_id, dep_type);
Ok(())
}
// ─────────────────────────────────────────────────────────────
// Validation
// ─────────────────────────────────────────────────────────────
pub fn validate(&self) -> Result<(), GraphError> {
use petgraph::algo::is_cyclic_directed;
if is_cyclic_directed(&self.graph) {
return Err(GraphError::CyclicDependency(self.find_cycle()));
}
Ok(())
}
fn find_cycle(&self) -> Vec<String> {
use petgraph::algo::kosaraju_scc;
// Find strongly connected components with more than 1 node
for scc in kosaraju_scc(&self.graph) {
if scc.len() > 1 {
return scc.iter()
.map(|id| self.graph[*id].name.clone())
.collect();
}
}
vec![]
}
// ─────────────────────────────────────────────────────────────
// Queries
// ─────────────────────────────────────────────────────────────
pub fn get(&self, id: ServiceId) -> Option<&Service> {
self.graph.node_weight(id)
}
pub fn get_mut(&mut self, id: ServiceId) -> Option<&mut Service> {
self.graph.node_weight_mut(id)
}
pub fn get_by_name(&self, name: &str) -> Option<ServiceId> {
self.by_name.get(name).copied()
}
pub fn get_state(&self, id: ServiceId) -> &ServiceState {
&self.graph[id].state
}
pub fn set_state(&mut self, id: ServiceId, state: ServiceState) {
self.graph[id].state = state;
}
pub fn all_services(&self) -> impl Iterator<Item = ServiceId> + '_ {
self.graph.node_indices()
}
/// Get topologically sorted start order
pub fn start_order(&self) -> Vec<ServiceId> {
use petgraph::algo::toposort;
toposort(&self.graph, None).unwrap_or_default()
}
// ─────────────────────────────────────────────────────────────
// Dependency queries
// ─────────────────────────────────────────────────────────────
/// What does this service depend on?
pub fn dependencies(&self, id: ServiceId) -> Vec<(ServiceId, DepType)> {
use petgraph::Direction;
self.graph
.edges_directed(id, Direction::Incoming)
.map(|e| (e.source(), *e.weight()))
.collect()
}
/// What depends on this service?
pub fn dependents(&self, id: ServiceId) -> Vec<ServiceId> {
use petgraph::Direction;
self.graph
.edges_directed(id, Direction::Outgoing)
.map(|e| e.target())
.collect()
}
/// Can this service start right now?
pub fn can_start(&self, id: ServiceId) -> Result<(), BlockedReason> {
let mut waiting_on = Vec::new();
let mut conflicts_with = Vec::new();
for (dep_id, dep_type) in self.dependencies(id) {
let dep_state = &self.graph[dep_id].state;
let dep_name = self.graph[dep_id].name.clone();
match dep_type {
DepType::Requires => {
if !dep_state.is_satisfied() {
waiting_on.push(dep_name);
}
}
DepType::After => {
// Just needs to have started (not still Inactive/Blocked)
if matches!(dep_state, ServiceState::Inactive | ServiceState::Blocked { .. }) {
waiting_on.push(dep_name);
}
}
DepType::Wants => {
// Soft dep - don't block
}
DepType::Conflicts => {
if dep_state.is_active() {
conflicts_with.push(dep_name);
}
}
}
}
if !conflicts_with.is_empty() {
return Err(BlockedReason::ConflictsWith(conflicts_with));
}
if !waiting_on.is_empty() {
return Err(BlockedReason::WaitingOn(waiting_on));
}
Ok(())
}
/// Check if all "requires" deps are satisfied
pub fn all_requires_satisfied(&self, id: ServiceId) -> bool {
self.dependencies(id)
.iter()
.filter(|(_, dt)| *dt == DepType::Requires)
.all(|(dep_id, _)| self.graph[*dep_id].state.is_satisfied())
}
}
#[derive(Debug, Clone)]
pub enum BlockedReason {
WaitingOn(Vec<String>),
ConflictsWith(Vec<String>),
}
impl BlockedReason {
pub fn waiting_on(&self) -> Vec<ServiceId> {
// Would need graph reference to convert names to IDs
vec![]
}
pub fn conflicts_with(&self) -> Vec<ServiceId> {
vec![]
}
}
#[derive(Debug, thiserror::Error)]
pub enum GraphError {
#[error("Unknown service: {0}")]
UnknownService(String),
#[error("Cyclic dependency: {}", .0.join(" → "))]
CyclicDependency(Vec<String>),
#[error("Duplicate service name: {0}")]
DuplicateName(String),
}
```
### 5.4 Graph Reload (Hot Reload)
```rust
impl ServiceGraph {
/// Reload configuration from disk.
/// Validates before applying. Running states preserved.
pub fn reload_from_directory(&mut self, path: &Path) -> Result<ReloadDiff, ReloadError> {
// 1. Load candidate graph from disk
let candidate = ServiceGraph::load_from_directory(path)?;
// 2. Validate candidate
candidate.validate()?;
// 3. Compute diff
let diff = self.compute_diff(&candidate);
// 4. Check if removals are safe
for name in &diff.removed {
let id = self.get_by_name(name).unwrap();
let running_deps = self.dependents(id)
.into_iter()
.filter(|d| self.graph[*d].state.is_active())
.map(|d| self.graph[d].name.clone())
.collect::<Vec<_>>();
if !running_deps.is_empty() {
return Err(ReloadError::UnsafeRemoval {
service: name.clone(),
running_dependents: running_deps,
});
}
}
// 5. Preserve states for services that still exist
let mut preserved_states = HashMap::new();
for name in &diff.unchanged {
if let Some(id) = self.get_by_name(name) {
preserved_states.insert(name.clone(), self.graph[id].state.clone());
}
}
for name in &diff.changed {
if let Some(id) = self.get_by_name(name) {
preserved_states.insert(name.clone(), self.graph[id].state.clone());
}
}
// 6. Swap graphs
*self = candidate;
// 7. Restore preserved states
for (name, state) in preserved_states {
if let Some(id) = self.get_by_name(&name) {
self.graph[id].state = state;
}
}
Ok(diff)
}
fn compute_diff(&self, candidate: &ServiceGraph) -> ReloadDiff {
let old_names: HashSet<_> = self.by_name.keys().cloned().collect();
let new_names: HashSet<_> = candidate.by_name.keys().cloned().collect();
ReloadDiff {
added: new_names.difference(&old_names).cloned().collect(),
removed: old_names.difference(&new_names).cloned().collect(),
changed: vec![], // TODO: compare configs
unchanged: old_names.intersection(&new_names).cloned().collect(),
}
}
}
#[derive(Debug)]
pub struct ReloadDiff {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
pub unchanged: Vec<String>,
}
#[derive(Debug, thiserror::Error)]
pub enum ReloadError {
#[error("Config error: {0}")]
Config(#[from] ConfigError),
#[error("Graph error: {0}")]
Graph(#[from] GraphError),
#[error("Cannot remove {service}: running dependents: {}", running_dependents.join(", "))]
UnsafeRemoval {
service: String,
running_dependents: Vec<String>,
},
}
```
### 5.5 ASCII Visualization
```rust
impl ServiceGraph {
/// Explain why a service is blocked
pub fn format_why_blocked(&self, name: &str) -> String {
let Some(id) = self.get_by_name(name) else {
return format!("Unknown service: {name}");
};
let service = &self.graph[id];
let mut out = String::new();
match &service.state {
ServiceState::Blocked { waiting_on, conflicts_with } => {
out.push_str(&format!("{} {} (blocked)\n",
service.state.symbol(), name));
if !waiting_on.is_empty() {
for (i, dep_id) in waiting_on.iter().enumerate() {
let dep = &self.graph[*dep_id];
let is_last = i == waiting_on.len() - 1 && conflicts_with.is_empty();
let prefix = if is_last { "└──" } else { "├──" };
out.push_str(&format!(
"{} requires: {} ({}) {}\n",
prefix,
dep.name,
dep.state.name(),
if dep.state.is_satisfied() { "✓" } else { "← waiting" }
));
}
}
for (i, dep_id) in conflicts_with.iter().enumerate() {
let dep = &self.graph[*dep_id];
let is_last = i == conflicts_with.len() - 1;
let prefix = if is_last { "└──" } else { "├──" };
out.push_str(&format!(
"{} conflicts: {} ({}) ← must stop\n",
prefix,
dep.name,
dep.state.name(),
));
}
}
_ => {
out.push_str(&format!("{} {} ({})",
service.state.symbol(), name, service.state.name()));
}
}
out
}
/// Full dependency tree
pub fn format_tree(&self) -> String {
let mut out = String::new();
let order = self.start_order();
// Find root services (nothing depends on them)
let roots: Vec<_> = order.iter()
.filter(|id| self.dependents(**id).is_empty())
.copied()
.collect();
for (i, root) in roots.iter().enumerate() {
self.format_tree_node(*root, &mut out, "", i == roots.len() - 1);
}
out.push_str("\n[-]=inactive [?]=blocked [>]=starting [+]=running [!]=stopping [.]=exited [X]=failed\n");
out
}
fn format_tree_node(&self, id: ServiceId, out: &mut String, prefix: &str, is_last: bool) {
let service = &self.graph[id];
let connector = if is_last { "└── " } else { "├── " };
let child_prefix = format!("{}{}", prefix, if is_last { " " } else { "│ " });
let kind = if service.is_target { " [target]" } else { "" };
out.push_str(&format!(
"{}{}{} {}{} ({})\n",
prefix, connector,
service.state.symbol(),
service.name,
kind,
service.state.name()
));
let deps = self.dependencies(id);
for (i, (dep_id, _)) in deps.iter().enumerate() {
self.format_tree_node(*dep_id, out, &child_prefix, i == deps.len() - 1);
}
}
/// Simple list
pub fn format_list(&self) -> String {
let mut out = String::new();
for id in self.all_services() {
let service = &self.graph[id];
let pid_str = service.state.pid()
.map(|p| format!(" (pid: {})", p))
.unwrap_or_default();
out.push_str(&format!(
"{} {:20} {}{}\n",
service.state.symbol(),
service.name,
service.state.name(),
pid_str
));
}
out
}
}
```
---
## 6. Configuration
### 6.1 Format
TOML primary. Serde supports YAML/JSON if needed (detect by extension).
**Locations:**
- `/etc/zinit/services/` - service definitions
- `/etc/zinit/targets/` - target definitions
- `~/hero/var/zinit/services/` - user mode
### 6.2 Service Config
```toml
# /etc/zinit/services/sshd.toml
[service]
name = "sshd"
exec = "/usr/sbin/sshd -D"
dir = "/"
oneshot = false
# env = { KEY = "value" }
[dependencies]
after = ["sshd-setup"]
requires = ["sshd-setup", "network-ready"]
# wants = []
# conflicts = []
[lifecycle]
restart_delay_max_ms = 300000 # cap at 5 minutes
max_restarts = 10 # 0 = unlimited
start_timeout_ms = 30000
stop_timeout_ms = 10000
stop_signal = "SIGTERM"
[health]
interval_ms = 10000
timeout_ms = 5000
retries = 3
start_period_ms = 5000
[logging]
buffer_lines = 1000
# file = "/var/log/sshd.log"
forward = true
```
### 6.3 Target Config
```toml
# /etc/zinit/targets/network-ready.toml
[target]
name = "network-ready"
[dependencies]
requires = ["dhcp-client", "dns-resolver"]
```
### 6.4 Rust Types
```rust
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServiceConfigFile {
pub service: ServiceDef,
#[serde(default)]
pub dependencies: DependencyDef,
#[serde(default)]
pub lifecycle: LifecycleDef,
#[serde(default)]
pub health: Option<HealthDef>,
#[serde(default)]
pub logging: LoggingDef,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ServiceDef {
pub name: String,
pub exec: String,
#[serde(default = "default_dir")]
pub dir: String,
#[serde(default)]
pub oneshot: bool,
#[serde(default)]
pub env: HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Deserialize, Serialize)]
pub struct DependencyDef {
#[serde(default)]
pub after: Vec<String>,
#[serde(default)]
pub requires: Vec<String>,
#[serde(default)]
pub wants: Vec<String>,
#[serde(default)]
pub conflicts: Vec<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LifecycleDef {
#[serde(default = "default_restart")]
pub restart: RestartPolicy,
#[serde(default = "default_restart_delay")]
pub restart_delay_ms: u64,
#[serde(default = "default_restart_delay_max")]
pub restart_delay_max_ms: u64,
#[serde(default = "default_max_restarts")]
pub max_restarts: u32,
#[serde(default = "default_start_timeout")]
pub start_timeout_ms: u64,
#[serde(default = "default_stop_timeout")]
pub stop_timeout_ms: u64,
#[serde(default = "default_stop_signal")]
pub stop_signal: String,
}
impl Default for LifecycleDef {
fn default() -> Self {
Self {
restart: RestartPolicy::OnFailure,
restart_delay_ms: 1000,
restart_delay_max_ms: 300000,
max_restarts: 10,
start_timeout_ms: 30000,
stop_timeout_ms: 10000,
stop_signal: "SIGTERM".into(),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Deserialize, Serialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
Always,
#[default]
OnFailure,
Never,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum HealthDef {
Tcp {
target: String,
#[serde(flatten)]
common: HealthCommon,
},
Http {
target: String,
#[serde(default = "default_http_status")]
expect_status: u16,
#[serde(flatten)]
common: HealthCommon,
},
Exec {
target: String,
#[serde(flatten)]
common: HealthCommon,
},
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct HealthCommon {
#[serde(default = "default_health_interval")]
pub interval_ms: u64,
#[serde(default = "default_health_timeout")]
pub timeout_ms: u64,
#[serde(default = "default_health_retries")]
pub retries: u32,
#[serde(default = "default_start_period")]
pub start_period_ms: u64,
}
impl Default for HealthCommon {
fn default() -> Self {
Self {
interval_ms: 10000,
timeout_ms: 5000,
retries: 3,
start_period_ms: 5000,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct LoggingDef {
#[serde(default = "default_buffer_lines")]
pub buffer_lines: usize,
pub file: Option<String>,
#[serde(default = "default_forward")]
pub forward: bool,
}
impl Default for LoggingDef {
fn default() -> Self {
Self {
buffer_lines: 1000,
file: None,
forward: true,
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TargetConfigFile {
pub target: TargetDef,
#[serde(default)]
pub dependencies: DependencyDef,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TargetDef {
pub name: String,
}
// Default functions
fn default_dir() -> String { "/".into() }
fn default_restart() -> RestartPolicy { RestartPolicy::OnFailure }
fn default_restart_delay() -> u64 { 1000 }
fn default_restart_delay_max() -> u64 { 300000 }
fn default_max_restarts() -> u32 { 10 }
fn default_start_timeout() -> u64 { 30000 }
fn default_stop_timeout() -> u64 { 10000 }
fn default_stop_signal() -> String { "SIGTERM".into() }
fn default_http_status() -> u16 { 200 }
fn default_health_interval() -> u64 { 10000 }
fn default_health_timeout() -> u64 { 5000 }
fn default_health_retries() -> u32 { 3 }
fn default_start_period() -> u64 { 5000 }
fn default_buffer_lines() -> usize { 1000 }
fn default_forward() -> bool { true }
```
### 6.5 Loading
```rust
impl ServiceGraph {
pub fn load_from_directory(path: &Path) -> Result<Self, ConfigError> {
let mut graph = ServiceGraph::new();
// Load services
let services_dir = path.join("services");
if services_dir.exists() {
for entry in std::fs::read_dir(&services_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map_or(false, |e| e == "toml") {
let config: ServiceConfigFile = toml::from_str(
&std::fs::read_to_string(&path)?
).map_err(|e| ConfigError::Parse {
file: path.clone(),
message: e.to_string()
})?;
let service = Service {
name: config.service.name.clone(),
config: config.into(),
state: ServiceState::Inactive,
is_target: false,
};
graph.add_service(service);
}
}
}
// Load targets
let targets_dir = path.join("targets");
if targets_dir.exists() {
for entry in std::fs::read_dir(&targets_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map_or(false, |e| e == "toml") {
let config: TargetConfigFile = toml::from_str(
&std::fs::read_to_string(&path)?
)?;
let service = Service {
name: config.target.name.clone(),
config: config.into(),
state: ServiceState::Inactive,
is_target: true,
};
graph.add_service(service);
}
}
}
// Add dependency edges (second pass, all services now exist)
// ... iterate configs again, call add_dependency ...
Ok(graph)
}
}
```
---
## 7. Process Management
### 7.1 Spawning
```rust
use tokio::process::Command;
use std::process::Stdio;
use nix::unistd::{setpgid, Pid};
impl Supervisor {
async fn spawn_process(&self, service_id: ServiceId, config: ServiceConfig) {
let event_tx = self.event_tx.clone();
let log_buffers = self.log_buffers.clone();
let log_shipper_tx = self.log_shipper_tx.clone();
tokio::spawn(async move {
match do_spawn(&config).await {
Ok((mut child, stdout, stderr)) => {
let pid = child.id().unwrap_or(0);
// Update state with actual PID
// (need to send event back, or pass graph Arc)
// Spawn log readers
let service_name = config.service.name.clone();
tokio::spawn(read_log_stream(
service_id,
service_name.clone(),
LogStream::Stdout,
stdout,
log_buffers.clone(),
log_shipper_tx.clone(),
));
tokio::spawn(read_log_stream(
service_id,
service_name,
LogStream::Stderr,
stderr,
log_buffers,
log_shipper_tx,
));
// Wait for exit
let status = child.wait().await;
let (exit_code, signal) = match status {
Ok(s) => (s.code(), s.signal()),
Err(_) => (None, None),
};
// Notify supervisor
let _ = event_tx.send(SupervisorEvent::ProcessExited {
service_id,
exit_code,
signal,
}).await;
}
Err(e) => {
let _ = event_tx.send(SupervisorEvent::ProcessExited {
service_id,
exit_code: None,
signal: None,
}).await;
}
}
});
}
}
async fn do_spawn(config: &ServiceConfig) -> Result<(Child, ChildStdout, ChildStderr), Error> {
let mut cmd = Command::new("sh");
cmd.args(["-c", &config.service.exec]);
cmd.current_dir(&config.service.dir);
cmd.envs(&config.service.env);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
// Create new process group
unsafe {
cmd.pre_exec(|| {
let _ = setpgid(Pid::from_raw(0), Pid::from_raw(0));
Ok(())
});
}
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
Ok((child, stdout, stderr))
}
```
### 7.2 Signals
```rust
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
pub fn send_signal(pid: u32, signal: Signal) -> Result<(), nix::Error> {
kill(Pid::from_raw(pid as i32), signal)
}
/// Send to process group (kills children too)
pub fn send_signal_to_group(pid: u32, signal: Signal) -> Result<(), nix::Error> {
kill(Pid::from_raw(-(pid as i32)), signal)
}
pub fn parse_signal(name: &str) -> Result<Signal, Error> {
match name.to_uppercase().trim_start_matches("SIG") {
"TERM" => Ok(Signal::SIGTERM),
"KILL" => Ok(Signal::SIGKILL),
"INT" => Ok(Signal::SIGINT),
"HUP" => Ok(Signal::SIGHUP),
"USR1" => Ok(Signal::SIGUSR1),
"USR2" => Ok(Signal::SIGUSR2),
"QUIT" => Ok(Signal::SIGQUIT),
other => Err(Error::UnknownSignal(other.to_string())),
}
}
```
### 7.3 Health Checks
```rust
use tokio::net::TcpStream;
use tokio::time::timeout;
use std::time::Duration;
pub async fn check_health(config: &HealthDef) -> Result<(), HealthError> {
match config {
HealthDef::Tcp { target, common } => {
let dur = Duration::from_millis(common.timeout_ms);
timeout(dur, TcpStream::connect(target))
.await
.map_err(|_| HealthError::Timeout)?
.map_err(HealthError::Connect)?;
Ok(())
}
HealthDef::Http { target, expect_status, common } => {
// Simple HTTP GET, check status
// Use reqwest or hyper
todo!()
}
HealthDef::Exec { target, common } => {
let dur = Duration::from_millis(common.timeout_ms);
let output = timeout(dur, Command::new("sh").args(["-c", target]).output())
.await
.map_err(|_| HealthError::Timeout)?
.map_err(HealthError::Exec)?;
if output.status.success() {
Ok(())
} else {
Err(HealthError::NonZeroExit(output.status.code()))
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum HealthError {
#[error("Timeout")]
Timeout,
#[error("Connection failed: {0}")]
Connect(std::io::Error),
#[error("Exec failed: {0}")]
Exec(std::io::Error),
#[error("Non-zero exit: {0:?}")]
NonZeroExit(Option<i32>),
}
```
---
## 8. Log Management
### 8.1 Ring Buffer
```rust
use std::collections::VecDeque;
use std::time::SystemTime;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone)]
pub struct LogLine {
pub timestamp: SystemTime,
pub service: String,
pub stream: LogStream,
pub content: String,
}
pub struct LogBuffer {
lines: VecDeque<LogLine>,
max_lines: usize,
}
impl LogBuffer {
pub fn new(max_lines: usize) -> Self {
Self {
lines: VecDeque::with_capacity(max_lines.min(1000)),
max_lines,
}
}
pub fn push(&mut self, line: LogLine) {
if self.lines.len() >= self.max_lines {
self.lines.pop_front();
}
self.lines.push_back(line);
}
pub fn all(&self) -> impl Iterator<Item = &LogLine> {
self.lines.iter()
}
pub fn last_n(&self, n: usize) -> impl Iterator<Item = &LogLine> {
let skip = self.lines.len().saturating_sub(n);
self.lines.iter().skip(skip)
}
}
```
### 8.2 Log Reader Task
```rust
use tokio::io::{AsyncBufReadExt, BufReader};
async fn read_log_stream(
service_id: ServiceId,
service_name: String,
stream: LogStream,
reader: impl tokio::io::AsyncRead + Unpin,
log_buffers: Arc<RwLock<HashMap<ServiceId, LogBuffer>>>,
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(content)) = lines.next_line().await {
let line = LogLine {
timestamp: SystemTime::now(),
service: service_name.clone(),
stream,
content,
};
// Write to buffer (separate lock, brief)
{
let mut buffers = log_buffers.write().await;
if let Some(buf) = buffers.get_mut(&service_id) {
buf.push(line.clone());
}
}
// Forward to shipper (fire and forget)
if let Some(tx) = &log_shipper_tx {
let _ = tx.try_send(line);
}
}
}
```
### 8.3 Log Shipper (Optional)
```rust
pub struct LogShipper {
rx: mpsc::Receiver<LogLine>,
endpoint: String,
batch_size: usize,
flush_interval: Duration,
}
impl LogShipper {
pub fn new(rx: mpsc::Receiver<LogLine>, endpoint: String) -> Self {
Self {
rx,
endpoint,
batch_size: 100,
flush_interval: Duration::from_secs(5),
}
}
pub async fn run(mut self) {
let mut batch = Vec::with_capacity(self.batch_size);
let mut interval = tokio::time::interval(self.flush_interval);
loop {
tokio::select! {
Some(line) = self.rx.recv() => {
batch.push(line);
if batch.len() >= self.batch_size {
self.ship(&mut batch).await;
}
}
_ = interval.tick() => {
if !batch.is_empty() {
self.ship(&mut batch).await;
}
}
}
}
}
async fn ship(&self, batch: &mut Vec<LogLine>) {
// Send to vector.io / HTTP endpoint
// On failure: log warning, drop batch
match self.send_batch(batch).await {
Ok(_) => {}
Err(e) => {
eprintln!("Log shipping failed: {e}");
}
}
batch.clear();
}
async fn send_batch(&self, batch: &[LogLine]) -> Result<(), Error> {
// HTTP POST to self.endpoint
// Use reqwest or hyper
todo!()
}
}
```
---
## 9. IPC Protocol
### 9.1 Transport
Unix socket only: `/var/run/zinit.sock` or `~/hero/var/zinit.sock`
### 9.2 Protocol
JSON-RPC 2.0, newline-delimited.
### 9.3 Methods
For the complete API reference, see [openrpc.json](openrpc.json).
**System Methods:**
| `system.ping` | `{}` | `{version}` | Health check |
| `system.shutdown` | `{}` | `bool` | Stop all services and exit |
| `system.prepare_restart` | `{}` | `{state_file}` | Prepare for restart, save state |
**Service Methods:**
| `service.list` | `{}` | `[names]` | List all service names |
| `service.list_full` | `{}` | `[ServiceStatus]` | List all services with status |
| `service.status` | `{name}` | `ServiceStatus` | Detailed status |
| `service.start` | `{name}` | `{ok}` | Start service |
| `service.stop` | `{name}` | `{ok}` | Stop service |
| `service.restart` | `{name}` | `{ok}` | Stop then start |
| `service.kill` | `{name, signal?}` | `{ok}` | Send signal |
| `service.why` | `{name}` | `WhyBlocked` | Explain blocking |
| `service.tree` | `{}` | `{ascii}` | Dependency tree |
| `service.add` | `{config}` | `{ok}` | Add service at runtime |
| `service.remove` | `{name}` | `{ok}` | Remove service |
| `service.reload` | `{}` | `ReloadResult` | Reload from disk |
| `service.stats` | `{name}` | `ServiceStats` | CPU/memory stats |
| `service.start_all` | `{}` | `{started}` | Start all user services |
| `service.stop_all` | `{}` | `{stopped}` | Stop all user services |
| `service.delete_all` | `{}` | `{deleted}` | Delete all user services |
**Log Methods:**
| `logs.get` | `{name, lines?}` | `[LogLine]` | Get service logs |
| `logs.tail` | `{name, lines?}` | `[LogLine]` | Last N lines |
| `logs.filter` | `{name?, stream?, since?}` | `[LogLine]` | Filtered logs |
**Debug Methods:**
| `debug.state` | `{}` | `DebugState` | Full internal state |
| `debug.process_tree` | `{name}` | `ProcessTree` | Process tree for service |
**Xinet Methods:**
| `xinet.register` | `{config}` | `{ok}` | Register proxy |
| `xinet.unregister` | `{name}` | `{ok}` | Remove proxy |
| `xinet.list` | `{}` | `[names]` | List proxies |
| `xinet.status` | `{name}` | `XinetStatus` | Get proxy status |
| `xinet.status_all` | `{}` | `[XinetStatus]` | All proxy statuses |
### 9.4 Handler
```rust
use tokio::net::{UnixListener, UnixStream};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
pub async fn run_ipc_server(
socket_path: PathBuf,
supervisor: Arc<Supervisor>,
) -> Result<(), Error> {
// Remove stale socket
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path)?;
// Set permissions
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&socket_path,
std::fs::Permissions::from_mode(0o660))?;
}
loop {
let (stream, _) = listener.accept().await?;
let supervisor = supervisor.clone();
tokio::spawn(handle_connection(stream, supervisor));
}
}
async fn handle_connection(stream: UnixStream, supervisor: Arc<Supervisor>) {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
let response = match serde_json::from_str::<RpcRequest>(&line) {
Ok(request) => {
let result = dispatch(&supervisor, &request).await;
RpcResponse::from_result(request.id, result)
}
Err(e) => RpcResponse::parse_error(e.to_string()),
};
let json = serde_json::to_string(&response).unwrap();
let _ = writer.write_all(json.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
let _ = writer.flush().await;
}
}
async fn dispatch(supervisor: &Supervisor, request: &RpcRequest) -> Result<Value, RpcError> {
match request.method.as_str() {
"system.ping" => {
Ok(json!({"version": env!("CARGO_PKG_VERSION")}))
}
"service.list" => {
let graph = supervisor.graph.read().await;
let list: Vec<_> = graph.all_services()
.map(|id| {
let s = graph.get(id).unwrap();
json!({
"name": s.name,
"state": s.state.name(),
"pid": s.state.pid(),
})
})
.collect();
Ok(json!(list))
}
"service.start" => {
let name = request.params.get("name")
.and_then(|v| v.as_str())
.ok_or(RpcError::InvalidParams)?;
supervisor.start_service(name).await?;
Ok(json!({"ok": true}))
}
"service.why" => {
let name = request.params.get("name")
.and_then(|v| v.as_str())
.ok_or(RpcError::InvalidParams)?;
let graph = supervisor.graph.read().await;
let ascii = graph.format_why_blocked(name);
Ok(json!({"ascii": ascii}))
}
"service.tree" => {
let graph = supervisor.graph.read().await;
let ascii = graph.format_tree();
Ok(json!({"ascii": ascii}))
}
"service.add" => {
let config: ServiceConfigFile = serde_json::from_value(
request.params.get("config").cloned().unwrap_or_default()
).map_err(|_| RpcError::InvalidParams)?;
supervisor.add_service(config).await?;
Ok(json!({"ok": true}))
}
"service.reload" => {
let diff = supervisor.reload().await?;
Ok(json!({
"added": diff.added,
"removed": diff.removed,
"changed": diff.changed,
}))
}
// ... other methods ...
_ => Err(RpcError::MethodNotFound),
}
}
```
---
## 10. Implementation Details
### 10.1 Event Loop
```rust
impl Supervisor {
pub async fn run(&mut self) -> Result<(), Error> {
// Start IPC server
let socket_path = self.socket_path.clone();
let supervisor = Arc::new(self.clone()); // or use Arc<RwLock<Self>>
tokio::spawn(run_ipc_server(socket_path, supervisor.clone()));
// Start all services in dependency order
self.start_all().await;
// Event loop
while let Some(event) = self.event_rx.recv().await {
self.handle_event(event).await;
}
Ok(())
}
async fn handle_event(&mut self, event: SupervisorEvent) {
match event {
SupervisorEvent::ProcessExited { service_id, exit_code, signal } => {
let mut graph = self.graph.write().await;
let new_state = match (exit_code, signal) {
(Some(0), _) => ServiceState::Exited {
exit_code: Some(0),
exited_at: Instant::now(),
},
(Some(code), _) => ServiceState::Failed {
reason: FailureReason::ExitCode(code),
failed_at: Instant::now(),
},
(_, Some(sig)) => ServiceState::Failed {
reason: FailureReason::Signal(sig),
failed_at: Instant::now(),
},
_ => ServiceState::Failed {
reason: FailureReason::ExitCode(-1),
failed_at: Instant::now(),
},
};
graph.set_state(service_id, new_state.clone());
// Notify dependents
let dependents = graph.dependents(service_id);
drop(graph); // release before recursing
for dep_id in dependents {
self.reevaluate_service(dep_id).await;
}
// Maybe restart
self.maybe_schedule_restart(service_id).await;
}
SupervisorEvent::HealthCheckResult { service_id, passed, error } => {
let mut graph = self.graph.write().await;
if passed {
if let ServiceState::Starting { pid, .. } = graph.get_state(service_id).clone() {
graph.set_state(service_id, ServiceState::Running {
pid,
ready_at: Instant::now(),
});
// Reset exponential backoff - service is healthy
graph.get_mut(service_id).unwrap().reset_backoff();
// Notify dependents they can start
let dependents = graph.dependents(service_id);
drop(graph);
for dep_id in dependents {
self.reevaluate_service(dep_id).await;
}
}
} else {
// Increment failure counter, maybe kill
// TODO: track health check attempts
}
}
SupervisorEvent::Timeout { service_id, kind } => {
match kind {
TimeoutKind::Start => {
let mut graph = self.graph.write().await;
if let ServiceState::Starting { pid, .. } = graph.get_state(service_id).clone() {
// Still starting after timeout = failed
graph.set_state(service_id, ServiceState::Failed {
reason: FailureReason::StartTimeout,
failed_at: Instant::now(),
});
drop(graph);
// Kill the process
let _ = send_signal(pid, Signal::SIGKILL);
}
}
TimeoutKind::Stop => {
let mut graph = self.graph.write().await;
if let ServiceState::Stopping { pid, .. } = graph.get_state(service_id).clone() {
// Didn't exit gracefully, SIGKILL
let _ = send_signal(pid, Signal::SIGKILL);
graph.set_state(service_id, ServiceState::Exited {
exit_code: None,
exited_at: Instant::now(),
});
}
}
TimeoutKind::RestartDelay => {
self.try_start_service(service_id).await;
}
TimeoutKind::HealthCheck => {
// Schedule next health check
self.run_health_check(service_id).await;
}
}
}
}
}
async fn reevaluate_service(&self, service_id: ServiceId) {
let mut graph = self.graph.write().await;
let service = graph.get(service_id).unwrap();
// If blocked, check if we can now start
if let ServiceState::Blocked { .. } = &service.state {
match graph.can_start(service_id) {
Ok(()) => {
// Dependencies satisfied!
if service.is_target {
// Targets go straight to Running
graph.set_state(service_id, ServiceState::Running {
pid: 0,
ready_at: Instant::now(),
});
} else {
graph.set_state(service_id, ServiceState::Starting {
pid: 0,
started_at: Instant::now(),
});
let config = service.config.clone();
drop(graph);
self.spawn_process(service_id, config).await;
}
}
Err(reason) => {
// Still blocked, update reason
graph.set_state(service_id, ServiceState::Blocked {
waiting_on: match &reason {
BlockedReason::WaitingOn(v) => {
v.iter()
.filter_map(|n| graph.get_by_name(n))
.collect()
}
_ => vec![],
},
conflicts_with: match &reason {
BlockedReason::ConflictsWith(v) => {
v.iter()
.filter_map(|n| graph.get_by_name(n))
.collect()
}
_ => vec![],
},
});
}
}
}
}
}
```
### 10.2 Startup Sequence
```rust
impl Supervisor {
async fn start_all(&self) {
let graph = self.graph.read().await;
let order = graph.start_order();
drop(graph);
for service_id in order {
self.try_start_service(service_id).await;
}
}
async fn try_start_service(&self, service_id: ServiceId) {
let mut graph = self.graph.write().await;
let service = graph.get(service_id).unwrap();
// Only start if in a startable state
if !service.state.can_attempt_start() {
return;
}
match graph.can_start(service_id) {
Ok(()) => {
if service.is_target {
graph.set_state(service_id, ServiceState::Running {
pid: 0,
ready_at: Instant::now(),
});
// Notify dependents
let dependents = graph.dependents(service_id);
drop(graph);
for dep_id in dependents {
self.reevaluate_service(dep_id).await;
}
} else {
graph.set_state(service_id, ServiceState::Starting {
pid: 0,
started_at: Instant::now(),
});
let config = service.config.clone();
drop(graph);
self.spawn_process(service_id, config).await;
self.schedule_timeout(service_id, TimeoutKind::Start).await;
}
}
Err(reason) => {
graph.set_state(service_id, ServiceState::Blocked {
waiting_on: vec![], // TODO: convert names to IDs
conflicts_with: vec![],
});
}
}
}
}
```
### 10.3 Timer Management
```rust
impl Supervisor {
/// Schedule a restart with exponential backoff.
/// Returns true if restart was scheduled, false if max restarts exceeded.
async fn maybe_schedule_restart(&mut self, service_id: ServiceId) -> bool {
let mut graph = self.graph.write().await;
let service = graph.get_mut(service_id).unwrap();
if let Some(delay) = service.next_restart_delay() {
drop(graph);
// Schedule restart after delay
let event_tx = self.event_tx.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(delay).await;
let _ = event_tx.send(SupervisorEvent::Timeout {
service_id,
kind: TimeoutKind::RestartDelay
}).await;
});
self.timers.insert((service_id, TimeoutKind::RestartDelay), handle.abort_handle());
true
} else {
// Max restarts exceeded or policy says no restart
tracing::warn!(
service = %service.name,
restart_count = service.restart_count,
"Max restarts exceeded, giving up"
);
false
}
}
async fn schedule_timeout(&mut self, service_id: ServiceId, kind: TimeoutKind) {
let duration = match kind {
TimeoutKind::Start => {
let graph = self.graph.read().await;
Duration::from_millis(graph.get(service_id).unwrap().config.lifecycle.start_timeout_ms)
}
TimeoutKind::Stop => {
let graph = self.graph.read().await;
Duration::from_millis(graph.get(service_id).unwrap().config.lifecycle.stop_timeout_ms)
}
TimeoutKind::RestartDelay => {
// Don't use this for restart - use maybe_schedule_restart() instead
// which handles exponential backoff
panic!("Use maybe_schedule_restart() for restart scheduling");
}
TimeoutKind::HealthCheck => {
let graph = self.graph.read().await;
if let Some(health) = &graph.get(service_id).unwrap().config.health {
Duration::from_millis(health.common().interval_ms)
} else {
return;
}
}
};
let event_tx = self.event_tx.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(duration).await;
let _ = event_tx.send(SupervisorEvent::Timeout { service_id, kind }).await;
});
// Store handle for cancellation
self.timers.insert((service_id, kind), handle.abort_handle());
}
fn cancel_timeout(&mut self, service_id: ServiceId, kind: TimeoutKind) {
if let Some(handle) = self.timers.remove(&(service_id, kind)) {
handle.abort();
}
}
}
```
---
## 11. Shared Crate: zinit-common
A shared crate containing types and functionality used by server, client, and pid1.
### 11.1 Design Principles
- **No heavy dependencies** - just serde + serde_json
- **No async** - blocking client, async version in zinit-client if needed
- **Pure data types** - easily serializable
- **Shared logic** - validation, signal parsing, client helpers
### 11.2 Structure
```
zinit-common/
├── Cargo.toml
└── src/
├── lib.rs # Re-exports
├── state.rs # ServiceState, FailureReason
├── config.rs # ServiceConfig, DependencyDef, etc.
├── protocol.rs # RpcRequest, RpcResponse, error codes
├── responses.rs # ServiceInfo, ServiceStatus, LogLine
├── socket.rs # Path constants and helpers
├── signal.rs # Signal name/number conversion
├── validate.rs # Config validation
└── client.rs # Blocking ZinitClient
```
### 11.3 Cargo.toml
```toml
[package]
name = "zinit-common"
version = "0.1.0"
edition = "2021"
[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = "1"
```
### 11.4 State Types (state.rs)
```rust
use serde::{Deserialize, Serialize};
/// Service state - shared between server and client
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "status", rename_all = "lowercase")]
pub enum ServiceState {
Inactive,
Blocked { waiting_on: Vec<String> },
Starting { pid: u32 },
Running { pid: u32 },
Stopping { pid: u32 },
Exited { exit_code: Option<i32> },
Failed { reason: FailureReason },
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum FailureReason {
ExitCode { code: i32 },
Signal { signal: i32 },
StartTimeout,
StopTimeout,
HealthCheckFailed { attempts: u32 },
DependencyFailed { service: String },
SpawnError { message: String },
}
impl ServiceState {
pub fn name(&self) -> &'static str {
match self {
Self::Inactive => "inactive",
Self::Blocked { .. } => "blocked",
Self::Starting { .. } => "starting",
Self::Running { .. } => "running",
Self::Stopping { .. } => "stopping",
Self::Exited { .. } => "exited",
Self::Failed { .. } => "failed",
}
}
pub fn symbol(&self) -> &'static str {
match self {
Self::Inactive => "[-]",
Self::Blocked { .. } => "[?]",
Self::Starting { .. } => "[>]",
Self::Running { .. } => "[+]",
Self::Stopping { .. } => "[!]",
Self::Exited { .. } => "[.]",
Self::Failed { .. } => "[X]",
}
}
pub fn pid(&self) -> Option<u32> {
match self {
Self::Starting { pid } => Some(*pid),
Self::Running { pid } => Some(*pid),
Self::Stopping { pid } => Some(*pid),
_ => None,
}
}
pub fn is_active(&self) -> bool {
matches!(self, Self::Starting { .. } | Self::Running { .. } | Self::Stopping { .. })
}
pub fn is_satisfied(&self) -> bool {
matches!(self, Self::Running { .. })
}
pub fn can_attempt_start(&self) -> bool {
matches!(self, Self::Inactive | Self::Exited { .. } | Self::Failed { .. })
}
}
impl FailureReason {
pub fn display(&self) -> String {
match self {
Self::ExitCode { code } => format!("exit code {}", code),
Self::Signal { signal } => format!("signal {}", crate::signal::name(*signal)),
Self::StartTimeout => "start timeout".into(),
Self::StopTimeout => "stop timeout".into(),
Self::HealthCheckFailed { attempts } => format!("health check failed after {} attempts", attempts),
Self::DependencyFailed { service } => format!("dependency {} failed", service),
Self::SpawnError { message } => format!("spawn error: {}", message),
}
}
}
```
### 11.5 Config Types (config.rs)
```rust
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceConfig {
pub service: ServiceDef,
#[serde(default)]
pub dependencies: DependencyDef,
#[serde(default)]
pub lifecycle: LifecycleDef,
#[serde(default)]
pub health: Option<HealthDef>,
#[serde(default)]
pub logging: LoggingDef,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceDef {
pub name: String,
pub exec: String,
#[serde(default = "default_dir")]
pub dir: String,
#[serde(default)]
pub oneshot: bool,
#[serde(default)]
pub env: HashMap<String, String>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct DependencyDef {
#[serde(default)]
pub after: Vec<String>,
#[serde(default)]
pub requires: Vec<String>,
#[serde(default)]
pub wants: Vec<String>,
#[serde(default)]
pub conflicts: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LifecycleDef {
#[serde(default)]
pub restart: RestartPolicy,
#[serde(default = "default_restart_delay")]
pub restart_delay_ms: u64,
#[serde(default = "default_restart_delay_max")]
pub restart_delay_max_ms: u64,
#[serde(default = "default_max_restarts")]
pub max_restarts: u32,
#[serde(default = "default_start_timeout")]
pub start_timeout_ms: u64,
#[serde(default = "default_stop_timeout")]
pub stop_timeout_ms: u64,
#[serde(default = "default_stop_signal")]
pub stop_signal: String,
}
impl Default for LifecycleDef {
fn default() -> Self {
Self {
restart: RestartPolicy::default(),
restart_delay_ms: default_restart_delay(),
restart_delay_max_ms: default_restart_delay_max(),
max_restarts: default_max_restarts(),
start_timeout_ms: default_start_timeout(),
stop_timeout_ms: default_stop_timeout(),
stop_signal: default_stop_signal(),
}
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum RestartPolicy {
Always,
#[default]
OnFailure,
Never,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "lowercase")]
pub enum HealthDef {
Tcp {
target: String,
#[serde(flatten)]
common: HealthCommon,
},
Http {
target: String,
#[serde(default = "default_http_status")]
expect_status: u16,
#[serde(flatten)]
common: HealthCommon,
},
Exec {
target: String,
#[serde(flatten)]
common: HealthCommon,
},
}
impl HealthDef {
pub fn common(&self) -> &HealthCommon {
match self {
Self::Tcp { common, .. } => common,
Self::Http { common, .. } => common,
Self::Exec { common, .. } => common,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthCommon {
#[serde(default = "default_health_interval")]
pub interval_ms: u64,
#[serde(default = "default_health_timeout")]
pub timeout_ms: u64,
#[serde(default = "default_health_retries")]
pub retries: u32,
#[serde(default = "default_start_period")]
pub start_period_ms: u64,
}
impl Default for HealthCommon {
fn default() -> Self {
Self {
interval_ms: default_health_interval(),
timeout_ms: default_health_timeout(),
retries: default_health_retries(),
start_period_ms: default_start_period(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LoggingDef {
#[serde(default = "default_buffer_lines")]
pub buffer_lines: usize,
pub file: Option<String>,
#[serde(default = "default_forward")]
pub forward: bool,
}
impl Default for LoggingDef {
fn default() -> Self {
Self {
buffer_lines: default_buffer_lines(),
file: None,
forward: default_forward(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TargetConfig {
pub target: TargetDef,
#[serde(default)]
pub dependencies: DependencyDef,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TargetDef {
pub name: String,
}
// Default functions
fn default_dir() -> String { "/".into() }
fn default_restart_delay() -> u64 { 1000 }
fn default_restart_delay_max() -> u64 { 300000 }
fn default_max_restarts() -> u32 { 10 }
fn default_start_timeout() -> u64 { 30000 }
fn default_stop_timeout() -> u64 { 10000 }
fn default_stop_signal() -> String { "SIGTERM".into() }
fn default_http_status() -> u16 { 200 }
fn default_health_interval() -> u64 { 10000 }
fn default_health_timeout() -> u64 { 5000 }
fn default_health_retries() -> u32 { 3 }
fn default_start_period() -> u64 { 5000 }
fn default_buffer_lines() -> usize { 1000 }
fn default_forward() -> bool { true }
```
### 11.6 Protocol Types (protocol.rs)
```rust
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcRequest {
pub jsonrpc: String,
pub id: u64,
pub method: String,
#[serde(default)]
pub params: Value,
}
impl RpcRequest {
pub fn new(id: u64, method: impl Into<String>, params: Value) -> Self {
Self {
jsonrpc: "2.0".into(),
id,
method: method.into(),
params,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcResponse {
pub jsonrpc: String,
pub id: u64,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<RpcError>,
}
impl RpcResponse {
pub fn success(id: u64, result: Value) -> Self {
Self {
jsonrpc: "2.0".into(),
id,
result: Some(result),
error: None,
}
}
pub fn error(id: u64, code: i32, message: impl Into<String>) -> Self {
Self {
jsonrpc: "2.0".into(),
id,
result: None,
error: Some(RpcError {
code,
message: message.into(),
data: None,
}),
}
}
pub fn is_ok(&self) -> bool {
self.error.is_none()
}
pub fn into_result<T: serde::de::DeserializeOwned>(self) -> Result<T, RpcError> {
if let Some(err) = self.error {
Err(err)
} else if let Some(result) = self.result {
serde_json::from_value(result).map_err(|e| RpcError {
code: error_codes::INTERNAL_ERROR,
message: e.to_string(),
data: None,
})
} else {
serde_json::from_value(Value::Null).map_err(|e| RpcError {
code: error_codes::INTERNAL_ERROR,
message: e.to_string(),
data: None,
})
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RpcError {
pub code: i32,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<Value>,
}
impl std::fmt::Display for RpcError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "[{}] {}", self.code, self.message)
}
}
impl std::error::Error for RpcError {}
pub mod error_codes {
pub const PARSE_ERROR: i32 = -32700;
pub const INVALID_REQUEST: i32 = -32600;
pub const METHOD_NOT_FOUND: i32 = -32601;
pub const INVALID_PARAMS: i32 = -32602;
pub const INTERNAL_ERROR: i32 = -32603;
// Custom error codes
pub const SERVICE_NOT_FOUND: i32 = -32000;
pub const SERVICE_ALREADY_RUNNING: i32 = -32001;
pub const SERVICE_NOT_RUNNING: i32 = -32002;
pub const INVALID_CONFIG: i32 = -32003;
pub const CYCLE_DETECTED: i32 = -32004;
pub const UNSAFE_REMOVAL: i32 = -32005;
}
```
### 11.7 Response Types (responses.rs)
```rust
use serde::{Deserialize, Serialize};
use crate::state::ServiceState;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum DepType {
After,
Requires,
Wants,
Conflicts,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceInfo {
pub name: String,
pub state: ServiceState,
pub is_target: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStatus {
pub name: String,
pub state: ServiceState,
pub is_target: bool,
pub dependencies: Vec<DependencyInfo>,
pub uptime_secs: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DependencyInfo {
pub name: String,
pub dep_type: DepType,
pub state: ServiceState,
pub satisfied: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReloadResult {
pub added: Vec<String>,
pub removed: Vec<String>,
pub changed: Vec<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogLine {
pub timestamp_ms: u64,
pub service: String,
pub stream: LogStream,
pub content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WhyBlocked {
pub name: String,
pub blocked: bool,
pub waiting_on: Vec<String>,
pub conflicts_with: Vec<String>,
pub ascii: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TreeResponse {
pub ascii: String,
}
```
### 11.8 Socket Helpers (socket.rs)
```rust
use std::path::PathBuf;
pub const SYSTEM_SOCKET: &str = "/var/run/zinit.sock";
pub const USER_SOCKET_SUFFIX: &str = "hero/var/zinit.sock";
/// Get the default socket path.
/// Prefers system socket if it exists, otherwise user socket.
pub fn default_path() -> PathBuf {
if PathBuf::from(SYSTEM_SOCKET).exists() {
PathBuf::from(SYSTEM_SOCKET)
} else if let Some(home) = std::env::var_os("HOME") {
PathBuf::from(home).join(USER_SOCKET_SUFFIX)
} else {
PathBuf::from(SYSTEM_SOCKET)
}
}
/// Get the user socket path.
pub fn user_path() -> Option<PathBuf> {
std::env::var_os("HOME").map(|home| PathBuf::from(home).join(USER_SOCKET_SUFFIX))
}
/// Get the system socket path.
pub fn system_path() -> PathBuf {
PathBuf::from(SYSTEM_SOCKET)
}
```
### 11.9 Signal Helpers (signal.rs)
```rust
/// Parse signal name to number
pub fn parse(name: &str) -> Option<i32> {
match name.to_uppercase().trim_start_matches("SIG") {
"HUP" => Some(1),
"INT" => Some(2),
"QUIT" => Some(3),
"KILL" => Some(9),
"USR1" => Some(10),
"USR2" => Some(12),
"TERM" => Some(15),
_ => name.parse().ok(),
}
}
/// Get signal name from number
pub fn name(sig: i32) -> &'static str {
match sig {
1 => "SIGHUP",
2 => "SIGINT",
3 => "SIGQUIT",
9 => "SIGKILL",
10 => "SIGUSR1",
12 => "SIGUSR2",
15 => "SIGTERM",
_ => "UNKNOWN",
}
}
```
### 11.10 Config Validation (validate.rs)
```rust
use crate::config::{ServiceConfig, TargetConfig};
/// Validate a service config.
/// Returns list of errors (empty = valid).
pub fn validate_service(config: &ServiceConfig) -> Vec<String> {
let mut errors = Vec::new();
if config.service.name.is_empty() {
errors.push("service.name is required".into());
}
if config.service.name.contains('/') || config.service.name.contains('\0') {
errors.push("service.name contains invalid characters".into());
}
if config.service.exec.is_empty() {
errors.push("service.exec is required".into());
}
if config.lifecycle.restart_delay_ms == 0 {
errors.push("lifecycle.restart_delay_ms must be > 0".into());
}
if config.lifecycle.start_timeout_ms == 0 {
errors.push("lifecycle.start_timeout_ms must be > 0".into());
}
if config.lifecycle.stop_timeout_ms == 0 {
errors.push("lifecycle.stop_timeout_ms must be > 0".into());
}
if crate::signal::parse(&config.lifecycle.stop_signal).is_none() {
errors.push(format!("invalid stop_signal: {}", config.lifecycle.stop_signal));
}
if let Some(ref health) = config.health {
let common = health.common();
if common.retries == 0 {
errors.push("health.retries must be > 0".into());
}
if common.timeout_ms == 0 {
errors.push("health.timeout_ms must be > 0".into());
}
if common.interval_ms == 0 {
errors.push("health.interval_ms must be > 0".into());
}
}
if config.logging.buffer_lines == 0 {
errors.push("logging.buffer_lines must be > 0".into());
}
errors
}
/// Validate a target config.
pub fn validate_target(config: &TargetConfig) -> Vec<String> {
let mut errors = Vec::new();
if config.target.name.is_empty() {
errors.push("target.name is required".into());
}
if config.target.name.contains('/') || config.target.name.contains('\0') {
errors.push("target.name contains invalid characters".into());
}
errors
}
```
### 11.11 Blocking Client (client.rs)
```rust
use std::io::{BufRead, BufReader, Write};
use std::os::unix::net::UnixStream;
use std::path::Path;
use std::time::Duration;
use crate::config::ServiceConfig;
use crate::protocol::{RpcError, RpcRequest, RpcResponse};
use crate::responses::*;
use crate::state::ServiceState;
/// Blocking client for zinit-server.
/// For async usage, wrap in tokio::task::spawn_blocking or use async client in zinit-client.
pub struct ZinitClient {
stream: UnixStream,
reader: BufReader<UnixStream>,
next_id: u64,
}
#[derive(Debug)]
pub enum ClientError {
Io(std::io::Error),
Json(serde_json::Error),
Rpc(RpcError),
}
impl std::fmt::Display for ClientError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Io(e) => write!(f, "IO error: {}", e),
Self::Json(e) => write!(f, "JSON error: {}", e),
Self::Rpc(e) => write!(f, "RPC error: {}", e),
}
}
}
impl std::error::Error for ClientError {}
impl From<std::io::Error> for ClientError {
fn from(e: std::io::Error) -> Self { Self::Io(e) }
}
impl From<serde_json::Error> for ClientError {
fn from(e: serde_json::Error) -> Self { Self::Json(e) }
}
impl From<RpcError> for ClientError {
fn from(e: RpcError) -> Self { Self::Rpc(e) }
}
impl ZinitClient {
/// Connect to zinit server at the given socket path.
pub fn connect(path: &Path) -> Result<Self, ClientError> {
let stream = UnixStream::connect(path)?;
stream.set_read_timeout(Some(Duration::from_secs(30)))?;
stream.set_write_timeout(Some(Duration::from_secs(10)))?;
let reader = BufReader::new(stream.try_clone()?);
Ok(Self { stream, reader, next_id: 1 })
}
/// Connect to the default socket path.
pub fn connect_default() -> Result<Self, ClientError> {
Self::connect(&crate::socket::default_path())
}
/// Send a raw RPC call.
pub fn call(&mut self, method: &str, params: serde_json::Value) -> Result<RpcResponse, ClientError> {
let request = RpcRequest::new(self.next_id, method, params);
self.next_id += 1;
// Send
let json = serde_json::to_string(&request)?;
writeln!(self.stream, "{}", json)?;
self.stream.flush()?;
// Receive
let mut line = String::new();
self.reader.read_line(&mut line)?;
let response: RpcResponse = serde_json::from_str(&line)?;
Ok(response)
}
// ─────────────────────────────────────────────────────────────
// Typed convenience methods
// ─────────────────────────────────────────────────────────────
/// Ping the server, returns version.
pub fn ping(&mut self) -> Result<String, ClientError> {
let resp = self.call("system.ping", serde_json::json!({}))?;
let val: serde_json::Value = resp.into_result()?;
Ok(val.get("version")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string())
}
/// Shutdown the server.
pub fn shutdown(&mut self) -> Result<(), ClientError> {
let resp = self.call("system.shutdown", serde_json::json!({}))?;
let _: bool = resp.into_result()?;
Ok(())
}
/// List all services.
pub fn list(&mut self) -> Result<Vec<ServiceInfo>, ClientError> {
let resp = self.call("service.list", serde_json::json!({}))?;
resp.into_result().map_err(Into::into)
}
/// Get detailed status of a service.
pub fn status(&mut self, name: &str) -> Result<ServiceStatus, ClientError> {
let resp = self.call("service.status", serde_json::json!({"name": name}))?;
resp.into_result().map_err(Into::into)
}
/// Start a service.
pub fn start(&mut self, name: &str) -> Result<(), ClientError> {
let resp = self.call("service.start", serde_json::json!({"name": name}))?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Stop a service.
pub fn stop(&mut self, name: &str) -> Result<(), ClientError> {
let resp = self.call("service.stop", serde_json::json!({"name": name}))?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Restart a service.
pub fn restart(&mut self, name: &str) -> Result<(), ClientError> {
let resp = self.call("service.restart", serde_json::json!({"name": name}))?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Send a signal to a service.
pub fn kill(&mut self, name: &str, signal: Option<&str>) -> Result<(), ClientError> {
let params = match signal {
Some(s) => serde_json::json!({"name": name, "signal": s}),
None => serde_json::json!({"name": name}),
};
let resp = self.call("service.kill", params)?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Explain why a service is blocked.
pub fn why(&mut self, name: &str) -> Result<WhyBlocked, ClientError> {
let resp = self.call("service.why", serde_json::json!({"name": name}))?;
resp.into_result().map_err(Into::into)
}
/// Get dependency tree.
pub fn tree(&mut self) -> Result<String, ClientError> {
let resp = self.call("service.tree", serde_json::json!({}))?;
let tree: TreeResponse = resp.into_result()?;
Ok(tree.ascii)
}
/// Add a service at runtime.
pub fn add(&mut self, config: ServiceConfig) -> Result<(), ClientError> {
let resp = self.call("service.add", serde_json::json!({"config": config}))?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Remove a service.
pub fn remove(&mut self, name: &str) -> Result<(), ClientError> {
let resp = self.call("service.remove", serde_json::json!({"name": name}))?;
let _: serde_json::Value = resp.into_result()?;
Ok(())
}
/// Reload configuration from disk.
pub fn reload(&mut self) -> Result<ReloadResult, ClientError> {
let resp = self.call("service.reload", serde_json::json!({}))?;
resp.into_result().map_err(Into::into)
}
/// Get logs for a service.
pub fn logs(&mut self, name: &str, lines: Option<usize>) -> Result<Vec<LogLine>, ClientError> {
let params = match lines {
Some(n) => serde_json::json!({"name": name, "lines": n}),
None => serde_json::json!({"name": name}),
};
let resp = self.call("logs.tail", params)?;
resp.into_result().map_err(Into::into)
}
}
```
### 11.12 lib.rs
```rust
pub mod state;
pub mod config;
pub mod protocol;
pub mod responses;
pub mod socket;
pub mod signal;
pub mod validate;
pub mod client;
// Re-exports for convenience
pub use state::{ServiceState, FailureReason};
pub use config::{ServiceConfig, TargetConfig, DependencyDef, LifecycleDef, RestartPolicy};
pub use protocol::{RpcRequest, RpcResponse, RpcError, error_codes};
pub use responses::{ServiceInfo, ServiceStatus, LogLine, LogStream, DepType, ReloadResult};
pub use client::{ZinitClient, ClientError};
```
### 11.13 Usage Examples
**zinit-server:**
```rust
use zinit_common::{
ServiceState, FailureReason, ServiceConfig,
RpcRequest, RpcResponse, error_codes,
validate,
};
// Parse incoming RPC
let request: RpcRequest = serde_json::from_str(&line)?;
// Validate config before adding
let errors = validate::validate_service(&config);
if !errors.is_empty() {
return RpcResponse::error(request.id, error_codes::INVALID_CONFIG, errors.join(", "));
}
```
**zinit-client CLI:**
```rust
use zinit_common::{ZinitClient, ServiceState};
fn main() {
let mut client = ZinitClient::connect_default().unwrap();
for service in client.list().unwrap() {
println!("{} {} ({})",
service.state.symbol(),
service.name,
service.state.name()
);
}
}
```
**zinit-pid1:**
```rust
use zinit_common::ZinitClient;
fn health_check() -> bool {
match ZinitClient::connect_default() {
Ok(mut client) => client.ping().is_ok(),
Err(_) => false,
}
}
```
---
## 12. Project Structure
### 12.1 Workspace Layout
```
zinit/
├── Cargo.toml # Workspace definition
├── zinit-common/ # Shared types and blocking client
├── zinit-server/ # Service supervisor
├── zinit-client/ # CLI/TUI/REPL/Rhai
├── zinit-pid1/ # Init shim
├── sysvol/ # Volume management utility
└── tests/integration/ # Integration tests
```
### 12.2 zinit-common (shared crate)
```
zinit-common/
├── Cargo.toml
└── src/
├── lib.rs # Re-exports
├── state.rs # ServiceState, FailureReason
├── config.rs # ServiceConfig, DependencyDef, HealthDef, etc.
├── protocol.rs # RpcRequest, RpcResponse, error_codes
├── responses.rs # ServiceStatus, ServiceInfo, LogLine, etc.
├── socket.rs # Path constants and helpers
├── signal.rs # Signal name/number conversion
├── validate.rs # Config validation
├── client.rs # Blocking ZinitClient
├── async_client.rs # Async client (feature-gated)
└── xinet.rs # Xinet configuration types
```
### 12.3 zinit-server
```
zinit-server/
├── Cargo.toml
└── src/
├── main.rs # Entry point, CLI args, signal handling
├── lib.rs # Re-exports
├── graph.rs # ServiceGraph, dependency management, can_start()
├── process.rs # Process spawning, signals, health checks
├── log.rs # LogBuffer ring buffer
├── ipc.rs # Unix socket server, RPC dispatch
├── error.rs # Server-specific error types
├── debug.rs # Debug utilities
├── state.rs # State persistence (PersistedState)
├── syslog.rs # Syslog integration
│
├── supervisor/
│ ├── mod.rs # Supervisor struct, event loop
│ ├── events.rs # SupervisorEvent enum, TimeoutKind
│ ├── handlers.rs # Event handlers (process exit, health, timeout)
│ ├── spawning.rs # try_start_service(), process spawning
│ ├── persistence.rs # State save/restore across restarts
│ ├── api.rs # IPC command API handlers
│ └── builtins.rs # Built-in service support
│
└── xinet/
├── mod.rs # Socket activation module
├── manager.rs # XinetManager
├── proxy.rs # XinetProxy
└── connection.rs # Connection handling
```
### 12.4 zinit-client
```
zinit-client/
├── Cargo.toml
└── src/
├── main.rs # CLI entry point (supports piped stdin)
│
├── cli/
│ ├── mod.rs
│ ├── args.rs # Clap argument definitions
│ └── commands.rs # Command implementations
│
├── tui.rs # TUI interface (feature-gated)
├── repl.rs # Interactive REPL (feature-gated)
│
└── rhai/ # Rhai scripting (feature-gated)
├── mod.rs
├── engine.rs # Rhai script engine
└── service_builder.rs # Service configuration DSL
```
### 12.5 zinit-pid1
```
zinit-pid1/
├── Cargo.toml
└── src/
└── main.rs # Single-file implementation
# - Signal handling (SIGTERM, SIGINT, SIGUSR1/2)
# - Zombie reaping
# - Server spawn/restart
# - Container vs VM/bare-metal detection
# - Reboot syscall handling
```
---
## 13. Crate Dependencies
### 13.1 Workspace Cargo.toml
```toml
[workspace]
resolver = "2"
members = [
"zinit-common",
"zinit-server",
"zinit-client",
"zinit-pid1",
]
[workspace.package]
version = "0.1.0"
edition = "2021"
license = "Apache-2.0"
[workspace.dependencies]
# Shared
serde = { version = "1", features = ["derive"] }
serde_json = "1"
toml = "0.8"
# Async
tokio = { version = "1", features = ["full", "process"] }
# Graph
petgraph = "0.6"
# Unix
nix = { version = "0.27", features = ["signal", "process"] }
libc = "0.2"
# Error handling
thiserror = "1"
# Logging
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
# CLI
clap = { version = "4", features = ["derive"] }
```
### 13.2 zinit-common/Cargo.toml
```toml
[package]
name = "zinit-common"
version.workspace = true
edition.workspace = true
[dependencies]
serde.workspace = true
serde_json.workspace = true
```
### 13.3 zinit-server/Cargo.toml
```toml
[package]
name = "zinit-server"
version.workspace = true
edition.workspace = true
[dependencies]
zinit-common = { path = "../zinit-common" }
serde.workspace = true
serde_json.workspace = true
toml.workspace = true
tokio.workspace = true
petgraph.workspace = true
nix.workspace = true
libc.workspace = true
thiserror.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
[dev-dependencies]
tempfile = "3"
tokio-test = "0.4"
```
### 13.4 zinit-client/Cargo.toml
```toml
[package]
name = "zinit-client"
version.workspace = true
edition.workspace = true
[dependencies]
zinit-common = { path = "../zinit-common" }
serde.workspace = true
serde_json.workspace = true
clap.workspace = true
tokio = { workspace = true, optional = true }
# TUI (optional)
ratatui = { version = "0.25", optional = true }
crossterm = { version = "0.27", optional = true }
[features]
default = []
tui = ["ratatui", "crossterm"]
async = ["tokio"]
```
### 13.5 zinit-pid1/Cargo.toml
```toml
[package]
name = "zinit-pid1"
version.workspace = true
edition.workspace = true
[dependencies]
zinit-common = { path = "../zinit-common" }
nix.workspace = true
libc.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
```
---
## 14. zinit-pid1 Specification
### 14.1 Overview
zinit-pid1 is the init process (PID 1). It must:
- Spawn and monitor zinit-server
- Reap orphaned zombies
- Forward signals to zinit-server
- Handle system shutdown/reboot
- **Never exit 0** on VM/bare-metal (kernel panic)
### 14.2 Core Loop
```rust
use nix::sys::signal::{self, Signal, SigHandler, SigSet, SigAction, SaFlags};
use nix::sys::wait::{waitpid, WaitPidFlag, WaitStatus};
use nix::unistd::{fork, ForkResult, Pid, execv, getpid};
use nix::sys::reboot::{reboot, RebootMode};
use std::ffi::CString;
use std::process::Command;
use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::time::Duration;
use std::thread;
use zinit_common::ZinitClient;
// Signal flags (set by signal handlers)
static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
static SIGINT_RECEIVED: AtomicBool = AtomicBool::new(false);
static SIGUSR1_RECEIVED: AtomicBool = AtomicBool::new(false); // soft restart server
static SIGUSR2_RECEIVED: AtomicBool = AtomicBool::new(false); // self update
static SIGCHLD_RECEIVED: AtomicBool = AtomicBool::new(false);
// Shutdown mode
#[derive(Clone, Copy, PartialEq)]
enum ShutdownMode {
None,
Reboot,
Poweroff,
}
fn main() {
// Sanity check: we should be PID 1
if getpid().as_raw() != 1 {
eprintln!("Warning: not running as PID 1");
}
// 1. Setup signal handlers
setup_signals();
// 2. Spawn zinit-server
let mut server_pid = spawn_server();
// 3. Main loop
let mut shutdown_mode = ShutdownMode::None;
loop {
// Check signals
if SIGCHLD_RECEIVED.swap(false, Ordering::SeqCst) {
server_pid = reap_zombies(server_pid);
}
if SIGTERM_RECEIVED.swap(false, Ordering::SeqCst) {
shutdown_mode = ShutdownMode::Poweroff;
break;
}
if SIGINT_RECEIVED.swap(false, Ordering::SeqCst) {
shutdown_mode = ShutdownMode::Reboot;
break;
}
if SIGUSR1_RECEIVED.swap(false, Ordering::SeqCst) {
// Soft restart: tell server to exit, respawn it
soft_restart_server(&mut server_pid);
}
if SIGUSR2_RECEIVED.swap(false, Ordering::SeqCst) {
// Self update: check and apply update to pid1 binary
// Server handles its own update via SIGUSR1
handle_self_update();
}
// If server died unexpectedly, respawn it
if server_pid.is_none() {
eprintln!("zinit-server died, respawning...");
thread::sleep(Duration::from_secs(1));
server_pid = spawn_server();
}
// Sleep briefly to avoid busy loop
thread::sleep(Duration::from_millis(100));
}
// 4. Shutdown sequence
do_shutdown(server_pid, shutdown_mode);
}
```
### 14.3 Signal Setup
```rust
extern "C" fn signal_handler(sig: i32) {
match sig {
libc::SIGTERM => SIGTERM_RECEIVED.store(true, Ordering::SeqCst),
libc::SIGINT => SIGINT_RECEIVED.store(true, Ordering::SeqCst),
libc::SIGUSR1 => SIGUSR1_RECEIVED.store(true, Ordering::SeqCst),
libc::SIGUSR2 => SIGUSR2_RECEIVED.store(true, Ordering::SeqCst),
libc::SIGCHLD => SIGCHLD_RECEIVED.store(true, Ordering::SeqCst),
_ => {}
}
}
fn setup_signals() {
let handler = SigHandler::Handler(signal_handler);
let flags = SaFlags::SA_RESTART;
let action = SigAction::new(handler, flags, SigSet::empty());
unsafe {
signal::sigaction(Signal::SIGTERM, &action).unwrap();
signal::sigaction(Signal::SIGINT, &action).unwrap();
signal::sigaction(Signal::SIGUSR1, &action).unwrap();
signal::sigaction(Signal::SIGUSR2, &action).unwrap();
signal::sigaction(Signal::SIGCHLD, &action).unwrap();
}
}
```
### 14.4 Zombie Reaping
```rust
/// Reap all zombies. Returns None if server died, Some(pid) otherwise.
fn reap_zombies(server_pid: Option<Pid>) -> Option<Pid> {
let mut server_alive = server_pid;
loop {
match waitpid(Pid::from_raw(-1), Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(pid, code)) => {
eprintln!("Process {} exited with code {}", pid, code);
if Some(pid) == server_pid {
server_alive = None;
}
}
Ok(WaitStatus::Signaled(pid, sig, _)) => {
eprintln!("Process {} killed by signal {:?}", pid, sig);
if Some(pid) == server_pid {
server_alive = None;
}
}
Ok(WaitStatus::StillAlive) => break,
Err(nix::errno::Errno::ECHILD) => break, // no children
_ => break,
}
}
server_alive
}
```
### 14.5 Server Management
```rust
fn spawn_server() -> Option<Pid> {
match unsafe { fork() } {
Ok(ForkResult::Child) => {
// Child: exec zinit-server
let prog = CString::new("/usr/bin/zinit-server").unwrap();
let args = [prog.clone()];
execv(&prog, &args).expect("Failed to exec zinit-server");
unreachable!()
}
Ok(ForkResult::Parent { child }) => {
eprintln!("Spawned zinit-server with PID {}", child);
Some(child)
}
Err(e) => {
eprintln!("Failed to fork: {}", e);
None
}
}
}
fn soft_restart_server(server_pid: &mut Option<Pid>) {
if let Some(pid) = *server_pid {
// Tell server to exit gracefully (without stopping services)
let _ = signal::kill(pid, Signal::SIGUSR1);
// Wait for it to exit (with timeout)
for _ in 0..50 { // 5 seconds
thread::sleep(Duration::from_millis(100));
match waitpid(pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
*server_pid = None;
break;
}
_ => {}
}
}
// Force kill if still alive
if server_pid.is_some() {
let _ = signal::kill(pid, Signal::SIGKILL);
let _ = waitpid(pid, None);
*server_pid = None;
}
}
// Respawn
*server_pid = spawn_server();
}
```
### 14.6 Shutdown Sequence
```rust
fn do_shutdown(server_pid: Option<Pid>, mode: ShutdownMode) {
eprintln!("Initiating shutdown (mode: {:?})", mode);
// 1. Tell zinit-server to stop all services
if let Ok(mut client) = ZinitClient::connect_default() {
let _ = client.shutdown();
}
// 2. Wait for server to exit
if let Some(pid) = server_pid {
// Give it 30 seconds
for _ in 0..300 {
thread::sleep(Duration::from_millis(100));
match waitpid(pid, Some(WaitPidFlag::WNOHANG)) {
Ok(WaitStatus::Exited(_, _)) | Ok(WaitStatus::Signaled(_, _, _)) => {
break;
}
_ => {}
}
}
// Force kill if still alive
let _ = signal::kill(pid, Signal::SIGKILL);
let _ = waitpid(pid, None);
}
// 3. Reap any remaining zombies
loop {
match waitpid(Pid::from_raw(-1), Some(WaitPidFlag::WNOHANG)) {
Err(nix::errno::Errno::ECHILD) => break,
Ok(WaitStatus::StillAlive) => break,
_ => continue,
}
}
// 4. Sync filesystems
unsafe { libc::sync(); }
// 5. Container vs bare-metal/VM
if is_container() {
// Container: exit, let runtime handle restart policy
eprintln!("Container detected, exiting...");
match mode {
ShutdownMode::Poweroff | ShutdownMode::Reboot => std::process::exit(0),
ShutdownMode::None => std::process::exit(1),
}
}
// 6. Bare-metal/VM: reboot syscall - NEVER exit!
match mode {
ShutdownMode::Reboot => {
eprintln!("Rebooting...");
let _ = reboot(RebootMode::RB_AUTOBOOT);
}
ShutdownMode::Poweroff => {
eprintln!("Powering off...");
let _ = reboot(RebootMode::RB_POWER_OFF);
}
ShutdownMode::None => {
eprintln!("Unexpected shutdown state, rebooting...");
let _ = reboot(RebootMode::RB_AUTOBOOT);
}
}
// If reboot syscall failed, loop forever (don't exit!)
eprintln!("Reboot syscall failed, halting...");
loop {
thread::sleep(Duration::from_secs(3600));
}
}
```
### 14.7 Auto-Update (Placeholder)
```rust
fn handle_self_update() {
// TODO: Implement self-update
// 1. Check for new zinit-pid1 binary (from where?)
// 2. Verify signature/checksum
// 3. Replace binary
// 4. Re-exec self (tricky as PID 1!)
//
// Re-exec as PID 1:
// execv("/usr/bin/zinit-pid1", argv)
// This replaces the current process image but keeps PID 1
eprintln!("Self-update not yet implemented");
}
// Server update is handled by:
// 1. Server checks for updates periodically or on SIGUSR2
// 2. Server downloads new binary
// 3. Server signals pid1 with SIGUSR1
// 4. pid1 does soft_restart_server() which respawns with new binary
```
### 14.8 Container Detection
On bare-metal/VM, PID 1 is the actual system init. In a container, PID 1 runs inside a PID namespace - the kernel outside sees a different PID.
`/proc/1/sched` reveals this: it shows the *real* PID outside the namespace.
```rust
/// Detect if we're running in a container (PID namespace).
/// Returns true if we're PID 1 inside a namespace but not on the host.
fn is_container() -> bool {
std::fs::read_to_string("/proc/1/sched")
.ok()
.and_then(|s| {
// Format: "zinit-pid1 (12345, #threads: 1)"
// The number in parentheses is the real PID outside the namespace
let start = s.find('(')? + 1;
let end = s.find(',')?;
s[start..end].trim().parse::<u32>().ok()
})
.map(|real_pid| real_pid != 1)
.unwrap_or(false)
}
```
- **Container**: `real_pid` is something like 54321 → `is_container() == true` → `exit(0)` is fine
- **VM/bare-metal**: `real_pid` is 1 → `is_container() == false` → must use `reboot()` syscall
Works on any Linux container runtime (Docker, Podman, LXC, Kubernetes) without heuristics.
```
### 14.9 Signal Protocol Summary
| `SIGTERM` | External | Poweroff system |
| `SIGINT` | External | Reboot system |
| `SIGUSR1` | External | Soft restart zinit-server (for updates) |
| `SIGUSR2` | External | Check for self-update |
| `SIGCHLD` | Kernel | Reap zombies |
| `SIGTERM` | From pid1 | Full shutdown (stop all services, exit) |
| `SIGUSR1` | From pid1 | Soft exit (exit without stopping services) |
---
## 15. Xinet Socket Activation
Xinet provides socket activation - on-demand service startup when connections arrive.
### 15.1 Overview
```
┌─────────────────────────────────────────────────────────────────┐
│ XinetManager │
│ - Manages multiple XinetProxy instances │
│ - Registers/unregisters proxies via RPC │
└─────────────────────────────────────────────────────────────────┘
│
│ manages
▼
┌─────────────────────────────────────────────────────────────────┐
│ XinetProxy │
│ - Listens on specified port │
│ - Triggers service start on first connection │
│ - Proxies connections to backend service │
└─────────────────────────────────────────────────────────────────┘
```
### 15.2 Configuration
```rust
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct XinetConfig {
pub name: String, // Proxy name
pub listen_port: u16, // Port to listen on
pub target_service: String, // Service to start on connection
pub target_port: u16, // Port to proxy to
}
```
### 15.3 RPC Methods
| `xinet.register` | `{config}` | `{ok}` | Register a new proxy |
| `xinet.unregister` | `{name}` | `{ok}` | Remove a proxy |
| `xinet.list` | `{}` | `[names]` | List all proxies |
| `xinet.status` | `{name}` | `{status}` | Get proxy status |
| `xinet.status_all` | `{}` | `[{name, status}]` | Get all proxy statuses |
---
## 16. State Persistence
State persistence enables zero-downtime restarts by preserving service states.
### 16.1 Persisted State Structure
```rust
#[derive(Debug, Serialize, Deserialize)]
pub struct PersistedState {
pub schema_version: u32, // For forward compatibility
pub timestamp: u64, // When state was saved
pub boot_time: u64, // Original boot time
pub services: Vec<PersistedService>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct PersistedService {
pub name: String,
pub state: ServiceState,
pub pid: Option<u32>,
pub restart_count: u32,
pub started_at: Option<u64>,
}
```
### 16.2 File Locations
| `/run/zinit/state.json` | Service states and metadata |
| `/run/zinit/fds.json` | File descriptor mapping (for FD passing) |
### 16.3 TTL and Restoration
- **TTL**: 5 minutes - persisted state older than this is ignored
- **Restoration**: On startup, if valid state exists:
1. Restore service states (Running services are re-validated by checking PIDs)
2. Restore boot_time (for uptime calculations)
3. Restore restart_count (for backoff continuity)
4. Skip starting services that are already running
### 16.4 Usage: Soft Restart
```bash
# From zinit-pid1: SIGUSR1 triggers soft restart
# 1. Server saves state to /run/zinit/state.json
# 2. Server exits cleanly (services keep running)
# 3. pid1 respawns server
# 4. New server restores state, adopts running processes
```
---
## Appendix: Quick Reference
### State Symbols
```
[-] = inactive
[?] = blocked
[>] = starting
[+] = running
[!] = stopping
[.] = exited
[X] = failed
```
### Example CLI Session
```bash
$ zinit list
[+] network-ready running
[+] sshd running
[>] my-app starting
[?] worker blocked
$ zinit why worker
[?] worker (blocked)
├── requires: database (starting) ← waiting
└── requires: redis (inactive) ← waiting
$ zinit tree
[+] system [target] (running)
├── [+] network-ready [target] (running)
│ ├── [+] dhcp (running)
│ └── [+] dns (running)
└── [>] my-app (starting)
└── [+] network-ready [target] (running)
```