# zinit-server - Supervisor & IPC
Concurrency model, event loop, and RPC handling.
## Concurrency Model
Hybrid approach: **locks for synchronous request/response**, **channels for async events**.
| RPC queries (list, status, why, tree) | `graph.read()` | Fast, allows concurrent reads |
| RPC mutations (start, stop, add) | `graph.write()` | Atomic check + update |
| Log buffer access | `log_buffers.read/write()` | Separate lock, no graph contention |
| Process exit notification | channel -> event loop | Can't hold lock while waiting on child |
| Health check results | channel -> event loop | Runs async, reports when done |
| Timeout events | channel -> event loop | Timer fires independently |
| Log shipping | `try_send` to shipper task | Fire and forget, never blocks |
## Core Types
```rust
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use std::collections::HashMap;
pub struct Supervisor {
/// Dependency graph with service states
graph: Arc<RwLock<ServiceGraph>>,
/// Per-service log ring buffers (separate lock = no contention with graph)
log_buffers: Arc<RwLock<HashMap<ServiceId, LogBuffer>>>,
/// Receive events from background tasks
event_tx: mpsc::Sender<SupervisorEvent>,
event_rx: mpsc::Receiver<SupervisorEvent>,
/// Send logs to shipping task (optional, for vector.io)
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
/// Active timers, for cancellation
timers: HashMap<(ServiceId, TimeoutKind), tokio::task::AbortHandle>,
/// Config directory path
config_dir: PathBuf,
/// Socket path
socket_path: PathBuf,
}
/// Events from background tasks TO supervisor
#[derive(Debug)]
pub enum SupervisorEvent {
ProcessExited {
service_id: ServiceId,
exit_code: Option<i32>,
signal: Option<i32>,
},
HealthCheckResult {
service_id: ServiceId,
passed: bool,
error: Option<String>,
},
Timeout {
service_id: ServiceId,
kind: TimeoutKind,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum TimeoutKind {
Start,
Stop,
HealthCheck,
RestartDelay,
}
```
## TOCTOU Prevention
Hold write lock for entire transition:
```rust
// CORRECT - atomic check + update
fn start_service(&self, name: &str) -> Result<(), Error> {
let mut graph = self.graph.write().await; // exclusive access
let id = graph.get_by_name(name)?;
// check and update under same lock
match graph.get_state(id) {
ServiceState::Inactive | ServiceState::Exited { .. } | ServiceState::Failed { .. } => {
match graph.can_start(id) {
Ok(()) => {
graph.set_state(id, ServiceState::Starting {
pid: 0,
started_at: Instant::now()
});
let config = graph.get(id).config.clone();
drop(graph); // release lock before spawning
self.spawn_process(id, config).await;
Ok(())
}
Err(blocked_reason) => {
graph.set_state(id, ServiceState::Blocked {
waiting_on: vec![],
conflicts_with: vec![],
});
Ok(())
}
}
}
ServiceState::Running { .. } => Err(Error::AlreadyRunning),
ServiceState::Starting { .. } | ServiceState::Stopping { .. } => {
Err(Error::TransitionInProgress)
}
ServiceState::Blocked { .. } => Ok(()),
}
}
```
## Event Loop
```rust
impl Supervisor {
pub async fn run(&mut self) -> Result<(), Error> {
// Start IPC server
let socket_path = self.socket_path.clone();
let supervisor = Arc::new(self.clone());
tokio::spawn(run_ipc_server(socket_path, supervisor.clone()));
// Start all services in dependency order
self.start_all().await;
// Event loop
while let Some(event) = self.event_rx.recv().await {
self.handle_event(event).await;
}
Ok(())
}
async fn handle_event(&mut self, event: SupervisorEvent) {
match event {
SupervisorEvent::ProcessExited { service_id, exit_code, signal } => {
let mut graph = self.graph.write().await;
let new_state = match (exit_code, signal) {
(Some(0), _) => ServiceState::Exited {
exit_code: Some(0),
exited_at: Instant::now(),
},
(Some(code), _) => ServiceState::Failed {
reason: FailureReason::ExitCode(code),
failed_at: Instant::now(),
},
(_, Some(sig)) => ServiceState::Failed {
reason: FailureReason::Signal(sig),
failed_at: Instant::now(),
},
_ => ServiceState::Failed {
reason: FailureReason::ExitCode(-1),
failed_at: Instant::now(),
},
};
graph.set_state(service_id, new_state.clone());
// Notify dependents
let dependents = graph.dependents(service_id);
drop(graph);
for dep_id in dependents {
self.reevaluate_service(dep_id).await;
}
// Maybe restart
self.maybe_schedule_restart(service_id).await;
}
SupervisorEvent::HealthCheckResult { service_id, passed, error } => {
let mut graph = self.graph.write().await;
if passed {
if let ServiceState::Starting { pid, .. } = graph.get_state(service_id).clone() {
graph.set_state(service_id, ServiceState::Running {
pid,
ready_at: Instant::now(),
});
// Reset exponential backoff - service is healthy
graph.get_mut(service_id).unwrap().reset_backoff();
let dependents = graph.dependents(service_id);
drop(graph);
for dep_id in dependents {
self.reevaluate_service(dep_id).await;
}
}
}
}
SupervisorEvent::Timeout { service_id, kind } => {
match kind {
TimeoutKind::Start => {
let mut graph = self.graph.write().await;
if let ServiceState::Starting { pid, .. } = graph.get_state(service_id).clone() {
graph.set_state(service_id, ServiceState::Failed {
reason: FailureReason::StartTimeout,
failed_at: Instant::now(),
});
drop(graph);
let _ = send_signal(pid, Signal::SIGKILL);
}
}
TimeoutKind::Stop => {
let mut graph = self.graph.write().await;
if let ServiceState::Stopping { pid, .. } = graph.get_state(service_id).clone() {
let _ = send_signal(pid, Signal::SIGKILL);
graph.set_state(service_id, ServiceState::Exited {
exit_code: None,
exited_at: Instant::now(),
});
}
}
TimeoutKind::RestartDelay => {
self.try_start_service(service_id).await;
}
TimeoutKind::HealthCheck => {
self.run_health_check(service_id).await;
}
}
}
}
}
}
```
## Timer Management
```rust
impl Supervisor {
/// Schedule a restart with exponential backoff.
/// Returns true if restart was scheduled, false if max restarts exceeded.
async fn maybe_schedule_restart(&mut self, service_id: ServiceId) -> bool {
let mut graph = self.graph.write().await;
let service = graph.get_mut(service_id).unwrap();
if let Some(delay) = service.next_restart_delay() {
drop(graph);
// Schedule restart after delay
let event_tx = self.event_tx.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(delay).await;
let _ = event_tx.send(SupervisorEvent::Timeout {
service_id,
kind: TimeoutKind::RestartDelay
}).await;
});
self.timers.insert((service_id, TimeoutKind::RestartDelay), handle.abort_handle());
true
} else {
// Max restarts exceeded or policy says no restart
tracing::warn!(
service = %service.name,
restart_count = service.restart_count,
"Max restarts exceeded, giving up"
);
false
}
}
async fn schedule_timeout(&mut self, service_id: ServiceId, kind: TimeoutKind) {
let duration = match kind {
TimeoutKind::Start => {
let graph = self.graph.read().await;
Duration::from_millis(graph.get(service_id).unwrap().config.lifecycle.start_timeout_ms)
}
TimeoutKind::Stop => {
let graph = self.graph.read().await;
Duration::from_millis(graph.get(service_id).unwrap().config.lifecycle.stop_timeout_ms)
}
TimeoutKind::RestartDelay => {
// Don't use this - use maybe_schedule_restart() for exponential backoff
panic!("Use maybe_schedule_restart() for restart scheduling");
}
TimeoutKind::HealthCheck => {
let graph = self.graph.read().await;
if let Some(health) = &graph.get(service_id).unwrap().config.health {
Duration::from_millis(health.common().interval_ms)
} else {
return;
}
}
};
let event_tx = self.event_tx.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(duration).await;
let _ = event_tx.send(SupervisorEvent::Timeout { service_id, kind }).await;
});
self.timers.insert((service_id, kind), handle.abort_handle());
}
fn cancel_timeout(&mut self, service_id: ServiceId, kind: TimeoutKind) {
if let Some(handle) = self.timers.remove(&(service_id, kind)) {
handle.abort();
}
}
}
```
**Exponential backoff behavior:**
With defaults: `restart_delay_ms: 1000`, `restart_delay_max_ms: 300000`, `max_restarts: 10`
```
Crash #1 → wait 1s
Crash #2 → wait 2s
Crash #3 → wait 4s
Crash #4 → wait 8s
Crash #5 → wait 16s
Crash #6 → wait 32s
Crash #7 → wait 64s
Crash #8 → wait 128s (~2 min)
Crash #9 → wait 256s (~4 min)
Crash #10 → wait 300s (capped at 5 min)
Crash #11 → give up, stay Failed
```
Total time before giving up: ~13 minutes. When service reaches Running state, backoff resets.
---
## IPC Protocol
### Transport
Unix socket only: `/var/run/zinit.sock` or `~/hero/var/zinit.sock`
### Protocol
JSON-RPC 2.0, newline-delimited.
### Methods
| `system.ping` | `{}` | `{version}` | Health check |
| `system.shutdown` | `{}` | `bool` | Stop all services and exit |
| `service.list` | `{}` | `[{name, state, pid?}]` | List all services |
| `service.status` | `{name}` | `{name, state, pid?, ...}` | Detailed status |
| `service.start` | `{name}` | `{ok}` | Start service |
| `service.stop` | `{name}` | `{ok}` | Stop service |
| `service.restart` | `{name}` | `{ok}` | Stop then start |
| `service.kill` | `{name, signal?}` | `{ok}` | Send signal |
| `service.why` | `{name}` | `{blocked, reason, ascii}` | Explain blocking |
| `service.tree` | `{}` | `{ascii}` | Dependency tree |
| `service.add` | `{config}` | `{ok}` | Add service at runtime |
| `service.remove` | `{name}` | `{ok}` | Remove service |
| `service.reload` | `{}` | `{added, removed, changed}` | Reload from disk |
| `logs.get` | `{name}` | `[{timestamp, stream, content}]` | Get logs |
| `logs.tail` | `{name, lines?}` | `[...]` | Last N lines |
### IPC Server
```rust
use tokio::net::{UnixListener, UnixStream};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
pub async fn run_ipc_server(
socket_path: PathBuf,
supervisor: Arc<Supervisor>,
) -> Result<(), Error> {
// Remove stale socket
let _ = std::fs::remove_file(&socket_path);
let listener = UnixListener::bind(&socket_path)?;
// Set permissions
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&socket_path,
std::fs::Permissions::from_mode(0o660))?;
}
loop {
let (stream, _) = listener.accept().await?;
let supervisor = supervisor.clone();
tokio::spawn(handle_connection(stream, supervisor));
}
}
async fn handle_connection(stream: UnixStream, supervisor: Arc<Supervisor>) {
let (reader, mut writer) = stream.into_split();
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(line)) = lines.next_line().await {
let response = match serde_json::from_str::<RpcRequest>(&line) {
Ok(request) => {
let result = dispatch(&supervisor, &request).await;
RpcResponse::from_result(request.id, result)
}
Err(e) => RpcResponse::parse_error(e.to_string()),
};
let json = serde_json::to_string(&response).unwrap();
let _ = writer.write_all(json.as_bytes()).await;
let _ = writer.write_all(b"\n").await;
let _ = writer.flush().await;
}
}
```
### Method Dispatch
```rust
async fn dispatch(supervisor: &Supervisor, request: &RpcRequest) -> Result<Value, RpcError> {
match request.method.as_str() {
"system.ping" => {
Ok(json!({"version": env!("CARGO_PKG_VERSION")}))
}
"system.shutdown" => {
supervisor.shutdown().await?;
Ok(json!(true))
}
"service.list" => {
let graph = supervisor.graph.read().await;
let list: Vec<_> = graph.all_services()
.map(|id| {
let s = graph.get(id).unwrap();
json!({
"name": s.name,
"state": s.state.name(),
"pid": s.state.pid(),
})
})
.collect();
Ok(json!(list))
}
"service.status" => {
let name = get_param_str(request, "name")?;
let graph = supervisor.graph.read().await;
let id = graph.get_by_name(name)
.ok_or(RpcError::service_not_found(name))?;
let s = graph.get(id).unwrap();
Ok(json!({
"name": s.name,
"state": s.state.name(),
"pid": s.state.pid(),
"is_target": s.is_target,
}))
}
"service.start" => {
let name = get_param_str(request, "name")?;
supervisor.start_service(name).await?;
Ok(json!({"ok": true}))
}
"service.stop" => {
let name = get_param_str(request, "name")?;
supervisor.stop_service(name).await?;
Ok(json!({"ok": true}))
}
"service.restart" => {
let name = get_param_str(request, "name")?;
supervisor.stop_service(name).await?;
supervisor.start_service(name).await?;
Ok(json!({"ok": true}))
}
"service.kill" => {
let name = get_param_str(request, "name")?;
let signal = request.params.get("signal")
.and_then(|v| v.as_str())
.unwrap_or("SIGTERM");
supervisor.kill_service(name, signal).await?;
Ok(json!({"ok": true}))
}
"service.why" => {
let name = get_param_str(request, "name")?;
let graph = supervisor.graph.read().await;
let ascii = graph.format_why_blocked(name);
Ok(json!({"ascii": ascii}))
}
"service.tree" => {
let graph = supervisor.graph.read().await;
let ascii = graph.format_tree();
Ok(json!({"ascii": ascii}))
}
"service.add" => {
let config: ServiceConfig = serde_json::from_value(
request.params.get("config").cloned().unwrap_or_default()
).map_err(|_| RpcError::invalid_params())?;
supervisor.add_service(config).await?;
Ok(json!({"ok": true}))
}
"service.remove" => {
let name = get_param_str(request, "name")?;
supervisor.remove_service(name).await?;
Ok(json!({"ok": true}))
}
"service.reload" => {
let diff = supervisor.reload().await?;
Ok(json!({
"added": diff.added,
"removed": diff.removed,
"changed": diff.changed,
}))
}
"logs.tail" => {
let name = get_param_str(request, "name")?;
let lines = request.params.get("lines")
.and_then(|v| v.as_u64())
.map(|n| n as usize)
.unwrap_or(100);
let logs = supervisor.get_logs(name, lines).await?;
Ok(json!(logs))
}
_ => Err(RpcError::method_not_found()),
}
}
fn get_param_str<'a>(request: &'a RpcRequest, key: &str) -> Result<&'a str, RpcError> {
request.params.get(key)
.and_then(|v| v.as_str())
.ok_or(RpcError::invalid_params())
}
```
## Project Structure
```
zinit-server/src/
├── supervisor/
│ ├── mod.rs # Supervisor struct
│ ├── event_loop.rs # Event handling
│ ├── startup.rs # Initial service startup
│ └── timers.rs # Timeout management
│
└── ipc/
├── mod.rs
├── server.rs # Unix socket listener
└── handlers.rs # Method dispatch
```