zinit 0.3.6

Process supervisor with dependency management
Documentation
# zinit-server - Process & Log Management

Process spawning, signal handling, and log capture.

## Process Spawning

```rust
use tokio::process::Command;
use std::process::Stdio;
use nix::unistd::{setpgid, Pid};

impl Supervisor {
    async fn spawn_process(&self, service_id: ServiceId, config: ServiceConfig) {
        let event_tx = self.event_tx.clone();
        let log_buffers = self.log_buffers.clone();
        let log_shipper_tx = self.log_shipper_tx.clone();
        
        tokio::spawn(async move {
            match do_spawn(&config).await {
                Ok((mut child, stdout, stderr)) => {
                    let pid = child.id().unwrap_or(0);
                    
                    // Spawn log readers
                    let service_name = config.service.name.clone();
                    tokio::spawn(read_log_stream(
                        service_id,
                        service_name.clone(),
                        LogStream::Stdout,
                        stdout,
                        log_buffers.clone(),
                        log_shipper_tx.clone(),
                    ));
                    tokio::spawn(read_log_stream(
                        service_id,
                        service_name,
                        LogStream::Stderr,
                        stderr,
                        log_buffers,
                        log_shipper_tx,
                    ));
                    
                    // Wait for exit
                    let status = child.wait().await;
                    let (exit_code, signal) = match status {
                        Ok(s) => (s.code(), s.signal()),
                        Err(_) => (None, None),
                    };
                    
                    // Notify supervisor
                    let _ = event_tx.send(SupervisorEvent::ProcessExited {
                        service_id,
                        exit_code,
                        signal,
                    }).await;
                }
                Err(e) => {
                    let _ = event_tx.send(SupervisorEvent::ProcessExited {
                        service_id,
                        exit_code: None,
                        signal: None,
                    }).await;
                }
            }
        });
    }
}

async fn do_spawn(config: &ServiceConfig) -> Result<(Child, ChildStdout, ChildStderr), Error> {
    let mut cmd = Command::new("sh");
    cmd.args(["-c", &config.service.exec]);
    cmd.current_dir(&config.service.dir);
    cmd.envs(&config.service.env);
    cmd.stdout(Stdio::piped());
    cmd.stderr(Stdio::piped());
    
    // Create new process group
    unsafe {
        cmd.pre_exec(|| {
            let _ = setpgid(Pid::from_raw(0), Pid::from_raw(0));
            Ok(())
        });
    }
    
    let mut child = cmd.spawn()?;
    let stdout = child.stdout.take().unwrap();
    let stderr = child.stderr.take().unwrap();
    
    Ok((child, stdout, stderr))
}
```

## Signal Handling

```rust
use nix::sys::signal::{kill, Signal};
use nix::unistd::Pid;

pub fn send_signal(pid: u32, signal: Signal) -> Result<(), nix::Error> {
    kill(Pid::from_raw(pid as i32), signal)
}

/// Send to process group (kills children too)
pub fn send_signal_to_group(pid: u32, signal: Signal) -> Result<(), nix::Error> {
    kill(Pid::from_raw(-(pid as i32)), signal)
}

pub fn parse_signal(name: &str) -> Result<Signal, Error> {
    match name.to_uppercase().trim_start_matches("SIG") {
        "TERM" => Ok(Signal::SIGTERM),
        "KILL" => Ok(Signal::SIGKILL),
        "INT" => Ok(Signal::SIGINT),
        "HUP" => Ok(Signal::SIGHUP),
        "USR1" => Ok(Signal::SIGUSR1),
        "USR2" => Ok(Signal::SIGUSR2),
        "QUIT" => Ok(Signal::SIGQUIT),
        other => Err(Error::UnknownSignal(other.to_string())),
    }
}
```

## Health Checks

```rust
use tokio::net::TcpStream;
use tokio::time::timeout;
use std::time::Duration;

pub async fn check_health(config: &HealthDef) -> Result<(), HealthError> {
    match config {
        HealthDef::Tcp { target, common } => {
            let dur = Duration::from_millis(common.timeout_ms);
            timeout(dur, TcpStream::connect(target))
                .await
                .map_err(|_| HealthError::Timeout)?
                .map_err(HealthError::Connect)?;
            Ok(())
        }
        HealthDef::Http { target, expect_status, common } => {
            let dur = Duration::from_millis(common.timeout_ms);
            // Simple HTTP GET, check status
            // Use reqwest or hyper
            todo!()
        }
        HealthDef::Exec { target, common } => {
            let dur = Duration::from_millis(common.timeout_ms);
            let output = timeout(dur, Command::new("sh").args(["-c", target]).output())
                .await
                .map_err(|_| HealthError::Timeout)?
                .map_err(HealthError::Exec)?;
            
            if output.status.success() {
                Ok(())
            } else {
                Err(HealthError::NonZeroExit(output.status.code()))
            }
        }
    }
}

#[derive(Debug, thiserror::Error)]
pub enum HealthError {
    #[error("Timeout")]
    Timeout,
    #[error("Connection failed: {0}")]
    Connect(std::io::Error),
    #[error("Exec failed: {0}")]
    Exec(std::io::Error),
    #[error("Non-zero exit: {0:?}")]
    NonZeroExit(Option<i32>),
}
```

---

## Log Management

### Ring Buffer

```rust
use std::collections::VecDeque;
use std::time::SystemTime;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LogStream {
    Stdout,
    Stderr,
}

#[derive(Debug, Clone)]
pub struct LogLine {
    pub timestamp: SystemTime,
    pub service: String,
    pub stream: LogStream,
    pub content: String,
}

pub struct LogBuffer {
    lines: VecDeque<LogLine>,
    max_lines: usize,
}

impl LogBuffer {
    pub fn new(max_lines: usize) -> Self {
        Self {
            lines: VecDeque::with_capacity(max_lines.min(1000)),
            max_lines,
        }
    }

    pub fn push(&mut self, line: LogLine) {
        if self.lines.len() >= self.max_lines {
            self.lines.pop_front();
        }
        self.lines.push_back(line);
    }

    pub fn all(&self) -> impl Iterator<Item = &LogLine> {
        self.lines.iter()
    }

    pub fn last_n(&self, n: usize) -> impl Iterator<Item = &LogLine> {
        let skip = self.lines.len().saturating_sub(n);
        self.lines.iter().skip(skip)
    }
}
```

### Log Reader Task

```rust
use tokio::io::{AsyncBufReadExt, BufReader};

async fn read_log_stream(
    service_id: ServiceId,
    service_name: String,
    stream: LogStream,
    reader: impl tokio::io::AsyncRead + Unpin,
    log_buffers: Arc<RwLock<HashMap<ServiceId, LogBuffer>>>,
    log_shipper_tx: Option<mpsc::Sender<LogLine>>,
) {
    let mut lines = BufReader::new(reader).lines();

    while let Ok(Some(content)) = lines.next_line().await {
        let line = LogLine {
            timestamp: SystemTime::now(),
            service: service_name.clone(),
            stream,
            content,
        };

        // Write to buffer (separate lock, brief)
        {
            let mut buffers = log_buffers.write().await;
            if let Some(buf) = buffers.get_mut(&service_id) {
                buf.push(line.clone());
            }
        }

        // Forward to shipper (fire and forget)
        if let Some(tx) = &log_shipper_tx {
            let _ = tx.try_send(line);
        }
    }
}
```

### Log Shipper (Optional)

For forwarding to vector.io or similar:

```rust
pub struct LogShipper {
    rx: mpsc::Receiver<LogLine>,
    endpoint: String,
    batch_size: usize,
    flush_interval: Duration,
}

impl LogShipper {
    pub fn new(rx: mpsc::Receiver<LogLine>, endpoint: String) -> Self {
        Self {
            rx,
            endpoint,
            batch_size: 100,
            flush_interval: Duration::from_secs(5),
        }
    }

    pub async fn run(mut self) {
        let mut batch = Vec::with_capacity(self.batch_size);
        let mut interval = tokio::time::interval(self.flush_interval);

        loop {
            tokio::select! {
                Some(line) = self.rx.recv() => {
                    batch.push(line);
                    if batch.len() >= self.batch_size {
                        self.ship(&mut batch).await;
                    }
                }
                _ = interval.tick() => {
                    if !batch.is_empty() {
                        self.ship(&mut batch).await;
                    }
                }
            }
        }
    }

    async fn ship(&self, batch: &mut Vec<LogLine>) {
        // Send to vector.io / HTTP endpoint
        // On failure: log warning, drop batch (backpressure)
        match self.send_batch(batch).await {
            Ok(_) => {}
            Err(e) => {
                eprintln!("Log shipping failed: {e}");
            }
        }
        batch.clear();
    }

    async fn send_batch(&self, batch: &[LogLine]) -> Result<(), Error> {
        // HTTP POST to self.endpoint
        todo!()
    }
}
```

## Project Structure

```
zinit-server/src/
├── process/
│   ├── mod.rs
│   ├── spawn.rs         # do_spawn, spawn_process
│   ├── signal.rs        # send_signal, parse_signal
│   └── health.rs        # check_health, HealthError
│
└── log/
    ├── mod.rs
    ├── buffer.rs        # LogBuffer, LogLine
    ├── reader.rs        # read_log_stream
    └── shipper.rs       # LogShipper (optional)
```