# zinit-server - Process & Log Management
Process spawning, signal handling, and log capture.
## Process Spawning
```rust
use tokio::process::Command;
use std::process::Stdio;
use nix::unistd::{setpgid, Pid};
impl Supervisor {
async fn spawn_process(&self, service_id: ServiceId, config: ServiceConfig) {
let event_tx = self.event_tx.clone();
let log_buffers = self.log_buffers.clone();
let log_shipper_tx = self.log_shipper_tx.clone();
tokio::spawn(async move {
match do_spawn(&config).await {
Ok((mut child, stdout, stderr)) => {
let pid = child.id().unwrap_or(0);
// Spawn log readers
let service_name = config.service.name.clone();
tokio::spawn(read_log_stream(
service_id,
service_name.clone(),
LogStream::Stdout,
stdout,
log_buffers.clone(),
log_shipper_tx.clone(),
));
tokio::spawn(read_log_stream(
service_id,
service_name,
LogStream::Stderr,
stderr,
log_buffers,
log_shipper_tx,
));
// Wait for exit
let status = child.wait().await;
let (exit_code, signal) = match status {
Ok(s) => (s.code(), s.signal()),
Err(_) => (None, None),
};
// Notify supervisor
let _ = event_tx.send(SupervisorEvent::ProcessExited {
service_id,
exit_code,
signal,
}).await;
}
Err(e) => {
let _ = event_tx.send(SupervisorEvent::ProcessExited {
service_id,
exit_code: None,
signal: None,
}).await;
}
}
});
}
}
async fn do_spawn(config: &ServiceConfig) -> Result<(Child, ChildStdout, ChildStderr), Error> {
let mut cmd = Command::new("sh");
cmd.args(["-c", &config.service.exec]);
cmd.current_dir(&config.service.dir);
cmd.envs(&config.service.env);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
// Create new process group
unsafe {
cmd.pre_exec(|| {
let _ = setpgid(Pid::from_raw(0), Pid::from_raw(0));
Ok(())
});
}
let mut child = cmd.spawn()?;
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
Ok((child, stdout, stderr))
}
```
## Signal Handling
```rust
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;
pub fn send_signal(pid: u32, signal: Signal) -> Result<(), nix::Error> {
kill(Pid::from_raw(pid as i32), signal)
}
/// Send to process group (kills children too)
pub fn send_signal_to_group(pid: u32, signal: Signal) -> Result<(), nix::Error> {
kill(Pid::from_raw(-(pid as i32)), signal)
}
pub fn parse_signal(name: &str) -> Result<Signal, Error> {
match name.to_uppercase().trim_start_matches("SIG") {
"TERM" => Ok(Signal::SIGTERM),
"KILL" => Ok(Signal::SIGKILL),
"INT" => Ok(Signal::SIGINT),
"HUP" => Ok(Signal::SIGHUP),
"USR1" => Ok(Signal::SIGUSR1),
"USR2" => Ok(Signal::SIGUSR2),
"QUIT" => Ok(Signal::SIGQUIT),
other => Err(Error::UnknownSignal(other.to_string())),
}
}
```
## Health Checks
```rust
use tokio::net::TcpStream;
use tokio::time::timeout;
use std::time::Duration;
pub async fn check_health(config: &HealthDef) -> Result<(), HealthError> {
match config {
HealthDef::Tcp { target, common } => {
let dur = Duration::from_millis(common.timeout_ms);
timeout(dur, TcpStream::connect(target))
.await
.map_err(|_| HealthError::Timeout)?
.map_err(HealthError::Connect)?;
Ok(())
}
HealthDef::Http { target, expect_status, common } => {
let dur = Duration::from_millis(common.timeout_ms);
// Simple HTTP GET, check status
// Use reqwest or hyper
todo!()
}
HealthDef::Exec { target, common } => {
let dur = Duration::from_millis(common.timeout_ms);
let output = timeout(dur, Command::new("sh").args(["-c", target]).output())
.await
.map_err(|_| HealthError::Timeout)?
.map_err(HealthError::Exec)?;
if output.status.success() {
Ok(())
} else {
Err(HealthError::NonZeroExit(output.status.code()))
}
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum HealthError {
#[error("Timeout")]
Timeout,
#[error("Connection failed: {0}")]
Connect(std::io::Error),
#[error("Exec failed: {0}")]
Exec(std::io::Error),
#[error("Non-zero exit: {0:?}")]
NonZeroExit(Option<i32>),
}
```
---
## Log Management
### Ring Buffer
```rust
use std::collections::VecDeque;
use std::time::SystemTime;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogStream {
Stdout,
Stderr,
}
#[derive(Debug, Clone)]
pub struct LogLine {
pub timestamp: SystemTime,
pub service: String,
pub stream: LogStream,
pub content: String,
}
pub struct LogBuffer {
lines: VecDeque<LogLine>,
max_lines: usize,
}
impl LogBuffer {
pub fn new(max_lines: usize) -> Self {
Self {
lines: VecDeque::with_capacity(max_lines.min(1000)),
max_lines,
}
}
pub fn push(&mut self, line: LogLine) {
if self.lines.len() >= self.max_lines {
self.lines.pop_front();
}
self.lines.push_back(line);
}
pub fn all(&self) -> impl Iterator<Item = &LogLine> {
self.lines.iter()
}
pub fn last_n(&self, n: usize) -> impl Iterator<Item = &LogLine> {
let skip = self.lines.len().saturating_sub(n);
self.lines.iter().skip(skip)
}
}
```
### Log Reader Task
```rust
use tokio::io::{AsyncBufReadExt, BufReader};
async fn read_log_stream(
service_id: ServiceId,
service_name: String,
stream: LogStream,
reader: impl tokio::io::AsyncRead + Unpin,
log_buffers: Arc<RwLock<HashMap<ServiceId, LogBuffer>>>,
log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
let mut lines = BufReader::new(reader).lines();
while let Ok(Some(content)) = lines.next_line().await {
let line = LogLine {
timestamp: SystemTime::now(),
service: service_name.clone(),
stream,
content,
};
// Write to buffer (separate lock, brief)
{
let mut buffers = log_buffers.write().await;
if let Some(buf) = buffers.get_mut(&service_id) {
buf.push(line.clone());
}
}
// Forward to shipper (fire and forget)
if let Some(tx) = &log_shipper_tx {
let _ = tx.try_send(line);
}
}
}
```
### Log Shipper (Optional)
For forwarding to vector.io or similar:
```rust
pub struct LogShipper {
rx: mpsc::Receiver<LogLine>,
endpoint: String,
batch_size: usize,
flush_interval: Duration,
}
impl LogShipper {
pub fn new(rx: mpsc::Receiver<LogLine>, endpoint: String) -> Self {
Self {
rx,
endpoint,
batch_size: 100,
flush_interval: Duration::from_secs(5),
}
}
pub async fn run(mut self) {
let mut batch = Vec::with_capacity(self.batch_size);
let mut interval = tokio::time::interval(self.flush_interval);
loop {
tokio::select! {
Some(line) = self.rx.recv() => {
batch.push(line);
if batch.len() >= self.batch_size {
self.ship(&mut batch).await;
}
}
_ = interval.tick() => {
if !batch.is_empty() {
self.ship(&mut batch).await;
}
}
}
}
}
async fn ship(&self, batch: &mut Vec<LogLine>) {
// Send to vector.io / HTTP endpoint
// On failure: log warning, drop batch (backpressure)
match self.send_batch(batch).await {
Ok(_) => {}
Err(e) => {
eprintln!("Log shipping failed: {e}");
}
}
batch.clear();
}
async fn send_batch(&self, batch: &[LogLine]) -> Result<(), Error> {
// HTTP POST to self.endpoint
todo!()
}
}
```
## Project Structure
```
zinit-server/src/
├── process/
│ ├── mod.rs
│ ├── spawn.rs # do_spawn, spawn_process
│ ├── signal.rs # send_signal, parse_signal
│ └── health.rs # check_health, HealthError
│
└── log/
├── mod.rs
├── buffer.rs # LogBuffer, LogLine
├── reader.rs # read_log_stream
└── shipper.rs # LogShipper (optional)
```