zinit 0.1.0

Process supervisor with dependency management
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
//! State preservation for supervisor restarts.
//!
//! When zinit-server is updated, it preserves:
//! - Running process PIDs (so we don't lose track of services)
//! - Service states (running, failed, restart counts, backoff timers)
//! - Open file descriptors (RPC socket, log pipes)

use std::collections::HashMap;
use std::os::fd::BorrowedFd;
use std::os::unix::io::RawFd;
use std::time::{SystemTime, UNIX_EPOCH};

use nix::fcntl::{FcntlArg, FdFlag, fcntl};
use nix::unistd::Pid;
use serde::{Deserialize, Serialize};

use crate::sdk::{FailureReason, ServiceConfig, ServiceState};

/// Schema version for state serialization.
pub const STATE_VERSION: u32 = 1;

/// Schema version for FD map serialization.
pub const FDS_VERSION: u32 = 1;

/// Path where state is persisted during restart.
pub const STATE_PATH: &str = "/run/zinit/state.json";

/// Path where FD map is persisted during restart.
pub const FDS_PATH: &str = "/run/zinit/fds.json";

/// Maximum age in milliseconds for a state file to be considered valid.
pub const STATE_MAX_AGE_MS: u64 = 5 * 60 * 1000; // 5 minutes

/// Root state object written to /run/zinit/state.json.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedState {
    /// Schema version for forward compatibility.
    pub version: u32,

    /// Timestamp when state was saved (unix millis).
    pub saved_at: u64,

    /// Server boot time (for uptime calculation).
    pub boot_time: u64,

    /// All services and their states.
    pub services: HashMap<String, PersistedService>,

    /// Config directory path.
    pub config_dir: String,

    /// Socket path.
    pub socket_path: String,
}

/// Per-service state.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedService {
    /// Service name.
    pub name: String,

    /// Current state.
    pub state: PersistedServiceState,

    /// PID if running/starting/stopping.
    pub pid: Option<u32>,

    /// Restart tracking.
    pub restart_count: u32,
    pub current_restart_delay_ms: u64,

    /// Last exit info.
    pub last_exit_code: Option<i32>,
    pub last_exit_signal: Option<i32>,

    /// Timestamps.
    pub started_at: Option<u64>,
    pub last_state_change: u64,

    /// Whether this service was added at runtime (not from disk config).
    pub ephemeral: bool,

    /// Original config (for ephemeral services that aren't on disk).
    #[serde(skip_serializing_if = "Option::is_none")]
    pub config: Option<ServiceConfig>,
}

/// Simplified state enum for serialization.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum PersistedServiceState {
    Inactive,
    Blocked,
    Starting,
    Running,
    Stopping,
    Exited,
    Failed,
}

impl From<&ServiceState> for PersistedServiceState {
    fn from(state: &ServiceState) -> Self {
        match state {
            ServiceState::Inactive => Self::Inactive,
            ServiceState::Blocked { .. } => Self::Blocked,
            ServiceState::Starting { .. } => Self::Starting,
            ServiceState::Running { .. } => Self::Running,
            ServiceState::Stopping { .. } => Self::Stopping,
            ServiceState::Exited { .. } => Self::Exited,
            ServiceState::Failed { .. } => Self::Failed,
        }
    }
}

impl PersistedServiceState {
    /// Convert back to full ServiceState with optional PID.
    pub fn into_service_state(self, pid: Option<u32>) -> ServiceState {
        match self {
            Self::Inactive => ServiceState::Inactive,
            Self::Blocked => ServiceState::Blocked { waiting_on: vec![] }, // Will be recalculated
            Self::Starting => pid
                .map(|p| ServiceState::Starting { pid: p })
                .unwrap_or(ServiceState::Inactive),
            Self::Running => pid
                .map(|p| ServiceState::Running { pid: p })
                .unwrap_or(ServiceState::Exited { exit_code: None }),
            Self::Stopping => pid
                .map(|p| ServiceState::Stopping { pid: p })
                .unwrap_or(ServiceState::Exited { exit_code: None }),
            Self::Exited => ServiceState::Exited { exit_code: None },
            Self::Failed => ServiceState::Failed {
                reason: FailureReason::SpawnError {
                    message: "restored from failed state".into(),
                },
            },
        }
    }
}

/// FD mapping written to /run/zinit/fds.json (or passed via ZINIT_FDS env).
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PersistedFds {
    /// Schema version.
    pub version: u32,

    /// RPC listener socket.
    pub rpc_socket: Option<RawFd>,

    /// Per-service FDs.
    pub services: HashMap<String, ServiceFds>,

    /// Socket-activated listeners (service_name -> fd).
    pub socket_activated: HashMap<String, RawFd>,
}

/// FDs associated with a service.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceFds {
    /// Read end of stdout pipe (we read from this).
    pub stdout_pipe: Option<RawFd>,

    /// Read end of stderr pipe.
    pub stderr_pipe: Option<RawFd>,
}

/// Result of validating a restored PID.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PidStatus {
    /// Process is still alive and matches expected binary.
    Alive,
    /// Process no longer exists.
    Dead,
    /// PID was recycled by a different process.
    WrongProcess,
}

/// Get current time in milliseconds since epoch.
pub fn now_millis() -> u64 {
    SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as u64
}

/// Clear the CLOEXEC flag so FD survives across process spawn.
///
/// # Safety
/// The caller must ensure the fd is a valid file descriptor.
pub fn clear_cloexec(fd: RawFd) -> Result<(), std::io::Error> {
    // SAFETY: Caller guarantees fd is valid
    let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };

    let flags = fcntl(borrowed, FcntlArg::F_GETFD).map_err(std::io::Error::other)?;

    let new_flags = FdFlag::from_bits_truncate(flags) - FdFlag::FD_CLOEXEC;

    // SAFETY: Caller guarantees fd is valid
    let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
    fcntl(borrowed, FcntlArg::F_SETFD(new_flags)).map_err(std::io::Error::other)?;

    Ok(())
}

/// Check if an FD is still valid.
pub fn is_fd_valid(fd: RawFd) -> bool {
    // SAFETY: We're checking if the fd is valid, so it may not be
    let borrowed = unsafe { BorrowedFd::borrow_raw(fd) };
    fcntl(borrowed, FcntlArg::F_GETFD).is_ok()
}

/// Validate if a PID is still alive and running the expected process.
pub fn validate_pid(pid: u32, expected_exec: &str) -> PidStatus {
    use std::fs;

    // Check if process exists
    if nix::sys::signal::kill(Pid::from_raw(pid as i32), None).is_err() {
        return PidStatus::Dead;
    }

    // Check if it's the right process by examining /proc/<pid>/exe or /proc/<pid>/cmdline
    let exe_path = format!("/proc/{}/exe", pid);
    let cmdline_path = format!("/proc/{}/cmdline", pid);

    // Try exe symlink first
    if let Ok(exe) = fs::read_link(&exe_path) {
        let exe_str = exe.to_string_lossy();
        let expected_binary = expected_exec
            .split_whitespace()
            .next()
            .unwrap_or(expected_exec);

        if exe_str.contains(expected_binary) || expected_binary.contains(&*exe_str) {
            return PidStatus::Alive;
        }
    }

    // Fall back to cmdline
    if let Ok(cmdline) = fs::read_to_string(&cmdline_path) {
        let cmdline_clean = cmdline.replace('\0', " ");
        let expected_binary = expected_exec
            .split_whitespace()
            .next()
            .unwrap_or(expected_exec);

        if cmdline_clean.contains(expected_binary) {
            return PidStatus::Alive;
        }
    }

    // Process exists but doesn't match - PID was recycled
    PidStatus::WrongProcess
}

/// Try to load restore state from disk.
pub fn try_load_restore_state() -> Option<PersistedState> {
    let json = std::fs::read_to_string(STATE_PATH).ok()?;
    let state: PersistedState = serde_json::from_str(&json).ok()?;

    // Version check
    if state.version > STATE_VERSION {
        tracing::warn!(
            "state version {} > supported {}, ignoring",
            state.version,
            STATE_VERSION
        );
        return None;
    }

    // Staleness check
    let age_ms = now_millis().saturating_sub(state.saved_at);
    if age_ms > STATE_MAX_AGE_MS {
        tracing::warn!("restore state is {} seconds old, ignoring", age_ms / 1000);
        return None;
    }

    Some(state)
}

/// Try to load FD map from environment variable.
pub fn try_load_restore_fds() -> Option<PersistedFds> {
    let fd_json = std::env::var("ZINIT_FDS").ok()?;
    serde_json::from_str(&fd_json).ok()
}

/// Delete state files after successful restore.
pub fn cleanup_state_files() {
    let _ = std::fs::remove_file(STATE_PATH);
    let _ = std::fs::remove_file(FDS_PATH);
}

/// Save state to disk atomically.
pub fn save_state(state: &PersistedState) -> Result<(), std::io::Error> {
    // Ensure directory exists
    std::fs::create_dir_all("/run/zinit")?;

    let json = serde_json::to_string_pretty(state)?;

    // Write atomically (write to tmp, rename)
    let tmp_path = format!("{}.tmp", STATE_PATH);
    std::fs::write(&tmp_path, &json)?;
    std::fs::rename(&tmp_path, STATE_PATH)?;

    Ok(())
}

/// Save FD map to disk.
pub fn save_fds(fds: &PersistedFds) -> Result<(), std::io::Error> {
    let json = serde_json::to_string_pretty(fds)?;

    let tmp_path = format!("{}.tmp", FDS_PATH);
    std::fs::write(&tmp_path, &json)?;
    std::fs::rename(&tmp_path, FDS_PATH)?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_persisted_state_roundtrip() {
        let state = PersistedState {
            version: STATE_VERSION,
            saved_at: now_millis(),
            boot_time: now_millis() - 1000,
            services: HashMap::new(),
            config_dir: crate::sdk::socket::system_config_dir()
                .to_string_lossy()
                .to_string(),
            socket_path: crate::sdk::socket::system_path()
                .to_string_lossy()
                .to_string(),
        };

        let json = serde_json::to_string(&state).unwrap();
        let restored: PersistedState = serde_json::from_str(&json).unwrap();

        assert_eq!(state.version, restored.version);
        assert_eq!(state.saved_at, restored.saved_at);
        assert_eq!(state.config_dir, restored.config_dir);
    }

    #[test]
    fn test_persisted_service_roundtrip() {
        let service = PersistedService {
            name: "test".to_string(),
            state: PersistedServiceState::Running,
            pid: Some(12345),
            restart_count: 2,
            current_restart_delay_ms: 2000,
            last_exit_code: Some(0),
            last_exit_signal: None,
            started_at: Some(now_millis() - 5000),
            last_state_change: now_millis(),
            ephemeral: false,
            config: None,
        };

        let json = serde_json::to_string(&service).unwrap();
        let restored: PersistedService = serde_json::from_str(&json).unwrap();

        assert_eq!(service.name, restored.name);
        assert_eq!(service.pid, restored.pid);
        assert_eq!(service.state, restored.state);
    }

    #[test]
    fn test_persisted_service_state_conversion() {
        // Test all state conversions
        assert_eq!(
            PersistedServiceState::from(&ServiceState::Inactive),
            PersistedServiceState::Inactive
        );
        assert_eq!(
            PersistedServiceState::from(&ServiceState::Running { pid: 123 }),
            PersistedServiceState::Running
        );
        assert_eq!(
            PersistedServiceState::from(&ServiceState::Starting { pid: 123 }),
            PersistedServiceState::Starting
        );

        // Test conversion back
        let running = PersistedServiceState::Running.into_service_state(Some(456));
        assert!(matches!(running, ServiceState::Running { pid: 456 }));

        let running_no_pid = PersistedServiceState::Running.into_service_state(None);
        assert!(matches!(running_no_pid, ServiceState::Exited { .. }));
    }

    #[test]
    fn test_pid_validation_dead() {
        // Use a PID that's very unlikely to exist
        assert_eq!(validate_pid(999999, "nonexistent"), PidStatus::Dead);
    }

    #[test]
    fn test_persisted_fds_roundtrip() {
        let fds = PersistedFds {
            version: FDS_VERSION,
            rpc_socket: Some(5),
            services: HashMap::from([(
                "app".to_string(),
                ServiceFds {
                    stdout_pipe: Some(7),
                    stderr_pipe: Some(8),
                },
            )]),
            socket_activated: HashMap::new(),
        };

        let json = serde_json::to_string(&fds).unwrap();
        let restored: PersistedFds = serde_json::from_str(&json).unwrap();

        assert_eq!(fds.version, restored.version);
        assert_eq!(fds.rpc_socket, restored.rpc_socket);
        assert_eq!(fds.services.len(), restored.services.len());
    }
}