mothership/fleet/
ship.rs

1//! Ship - A single process in the fleet
2//!
3//! All ships are equal peers in the fleet.
4
5use std::collections::{HashMap, VecDeque};
6use std::process::Stdio;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tracing::{debug, info, warn};
15
16use breaker_machines::CircuitBreaker;
17use state_machines::state_machine;
18
19use crate::charter::{Bind, RouteConfig};
20
21/// Maximum log lines to retain per ship
22const MAX_LOG_LINES: usize = 1000;
23/// Maximum log bytes to retain per ship (1MB)
24const MAX_LOG_BYTES: usize = 1024 * 1024;
25/// Maximum length per log line (truncate longer lines)
26const MAX_LINE_LENGTH: usize = 4096;
27
28/// Restart circuit defaults (crash loop protection)
29const RESTART_FAILURE_THRESHOLD: usize = 5;
30const RESTART_FAILURE_WINDOW_SECS: f64 = 60.0;
31const RESTART_BACKOFF_SECS: f64 = 30.0;
32const RESTART_SUCCESS_THRESHOLD: usize = 1;
33
34/// Grace period after launch before health checks trigger restarts (seconds)
35const STARTUP_GRACE_SECS: u64 = 10;
36
37/// Graceful shutdown timeout before SIGKILL (seconds)
38const TERMINATE_TIMEOUT_SECS: u64 = 5;
39
40/// Global flag to suppress stdout logging (for TUI mode)
41static SUPPRESS_STDOUT: AtomicBool = AtomicBool::new(false);
42
43/// Set whether to suppress stdout logging
44pub fn set_suppress_stdout(suppress: bool) {
45    SUPPRESS_STDOUT.store(suppress, Ordering::SeqCst);
46}
47
48/// Check if stdout logging is suppressed
49pub fn is_stdout_suppressed() -> bool {
50    SUPPRESS_STDOUT.load(Ordering::SeqCst)
51}
52
53/// Log entry from a ship
54#[derive(Debug, Clone)]
55pub struct LogEntry {
56    pub stream: &'static str,
57    pub message: String,
58}
59
60/// Bounded log buffer with byte and line limits
61#[derive(Debug)]
62struct LogBuffer {
63    entries: VecDeque<LogEntry>,
64    total_bytes: usize,
65}
66
67impl LogBuffer {
68    fn new() -> Self {
69        Self {
70            entries: VecDeque::with_capacity(MAX_LOG_LINES),
71            total_bytes: 0,
72        }
73    }
74
75    fn push(&mut self, stream: &'static str, mut message: String) {
76        // Truncate long lines
77        if message.len() > MAX_LINE_LENGTH {
78            message.truncate(MAX_LINE_LENGTH - 3);
79            message.push_str("...");
80        }
81
82        let msg_len = message.len();
83
84        // Evict old entries if we'd exceed byte limit
85        while self.total_bytes + msg_len > MAX_LOG_BYTES && !self.entries.is_empty() {
86            if let Some(old) = self.entries.pop_front() {
87                self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
88            }
89        }
90
91        // Evict if we'd exceed line limit
92        while self.entries.len() >= MAX_LOG_LINES {
93            if let Some(old) = self.entries.pop_front() {
94                self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
95            }
96        }
97
98        self.total_bytes += msg_len;
99        self.entries.push_back(LogEntry { stream, message });
100    }
101
102    fn recent(&self, limit: usize) -> Vec<LogEntry> {
103        self.entries.iter().rev().take(limit).cloned().collect()
104    }
105}
106
107/// Current status of a ship
108#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum ShipStatus {
110    /// Not yet started
111    #[default]
112    Pending,
113    /// Starting up, waiting for health check
114    Starting,
115    /// Running and healthy
116    Running,
117    /// Running but health check failed
118    Unhealthy,
119    /// Restart backoff (circuit open)
120    Backoff,
121    /// Process exited cleanly
122    Stopped,
123    /// Process crashed
124    Failed,
125}
126
127state_machine! {
128    name: ShipLifecycle,
129    context: (),
130    dynamic: true,
131
132    initial: Pending,
133    states: [
134        Pending,
135        Starting,
136        Running,
137        Unhealthy,
138        Backoff,
139        Stopped,
140        Failed,
141    ],
142    events {
143        start {
144            transition: { from: [Pending, Stopped, Failed, Backoff], to: Starting }
145        }
146        healthy {
147            transition: { from: [Starting, Unhealthy], to: Running }
148        }
149        unhealthy {
150            transition: { from: [Starting, Running], to: Unhealthy }
151        }
152        crash {
153            transition: { from: [Starting, Running, Unhealthy], to: Failed }
154        }
155        backoff {
156            transition: { from: [Failed, Starting, Running, Unhealthy], to: Backoff }
157        }
158        stop {
159            transition: { from: [Pending, Starting, Running, Unhealthy, Backoff, Failed], to: Stopped }
160        }
161    }
162}
163
164#[derive(Debug)]
165struct ShipState {
166    machine: DynamicShipLifecycle,
167    status: ShipStatus,
168    backoff_until: Option<Instant>,
169}
170
171impl ShipState {
172    fn new() -> Self {
173        let machine = DynamicShipLifecycle::new(());
174        let status = status_from_state(machine.current_state());
175        Self {
176            machine,
177            status,
178            backoff_until: None,
179        }
180    }
181
182    fn refresh_status(&mut self) {
183        self.status = status_from_state(self.machine.current_state());
184    }
185}
186
187fn status_from_state(state: &str) -> ShipStatus {
188    match state {
189        "Pending" => ShipStatus::Pending,
190        "Starting" => ShipStatus::Starting,
191        "Running" => ShipStatus::Running,
192        "Unhealthy" => ShipStatus::Unhealthy,
193        "Backoff" => ShipStatus::Backoff,
194        "Stopped" => ShipStatus::Stopped,
195        "Failed" => ShipStatus::Failed,
196        _ => ShipStatus::Pending,
197    }
198}
199
200/// Snapshot of ship state for sync access (e.g., TUI)
201#[derive(Debug, Clone)]
202pub struct ShipSnapshot {
203    name: String,
204    group: String,
205    command: String,
206    status: ShipStatus,
207    pid: Option<u32>,
208    healthcheck: Option<String>,
209    restart_count: u32,
210    critical: bool,
211    oneshot: bool,
212    routes: Vec<String>,
213}
214
215impl ShipSnapshot {
216    /// Get ship name
217    pub fn name(&self) -> &str {
218        &self.name
219    }
220
221    /// Get ship group
222    pub fn group(&self) -> &str {
223        &self.group
224    }
225
226    /// Get command
227    pub fn command(&self) -> &str {
228        &self.command
229    }
230
231    /// Get status
232    pub fn status(&self) -> ShipStatus {
233        self.status
234    }
235
236    /// Get PID if running
237    pub fn pid(&self) -> Option<u32> {
238        self.pid
239    }
240
241    /// Get healthcheck URL
242    pub fn healthcheck(&self) -> Option<&str> {
243        self.healthcheck.as_deref()
244    }
245
246    /// Get restart count
247    pub fn restart_count(&self) -> u32 {
248        self.restart_count
249    }
250
251    /// Is this ship critical (crash kills fleet)?
252    pub fn is_critical(&self) -> bool {
253        self.critical
254    }
255
256    /// Is this a oneshot job?
257    pub fn is_oneshot(&self) -> bool {
258        self.oneshot
259    }
260
261    /// Get routes (as display strings)
262    pub fn routes(&self) -> &[String] {
263        &self.routes
264    }
265}
266
267/// Builder for Ship
268#[derive(Default)]
269pub struct ShipBuilder {
270    name: String,
271    group: String,
272    command: String,
273    args: Vec<String>,
274    env: HashMap<String, String>,
275    bind: Option<Bind>,
276    healthcheck: Option<String>,
277    depends_on: Vec<String>,
278    routes: Vec<RouteConfig>,
279    critical: bool,
280    oneshot: bool,
281}
282
283impl ShipBuilder {
284    /// Create a new ship builder
285    pub fn new(name: impl Into<String>, command: impl Into<String>) -> Self {
286        Self {
287            name: name.into(),
288            command: command.into(),
289            critical: true, // Default: all ships are critical
290            ..Default::default()
291        }
292    }
293
294    /// Set the fleet group
295    pub fn group(mut self, group: impl Into<String>) -> Self {
296        self.group = group.into();
297        self
298    }
299
300    /// Set command arguments
301    pub fn args(mut self, args: Vec<String>) -> Self {
302        self.args = args;
303        self
304    }
305
306    /// Set environment variables
307    pub fn env(mut self, env: HashMap<String, String>) -> Self {
308        self.env = env;
309        self
310    }
311
312    /// Set bind address
313    pub fn bind(mut self, bind: Option<Bind>) -> Self {
314        self.bind = bind;
315        self
316    }
317
318    /// Set healthcheck endpoint
319    pub fn healthcheck(mut self, healthcheck: Option<String>) -> Self {
320        self.healthcheck = healthcheck;
321        self
322    }
323
324    /// Set dependencies
325    pub fn depends_on(mut self, depends_on: Vec<String>) -> Self {
326        self.depends_on = depends_on;
327        self
328    }
329
330    /// Set HTTP routes
331    pub fn routes(mut self, routes: Vec<RouteConfig>) -> Self {
332        self.routes = routes;
333        self
334    }
335
336    /// Set whether this ship is critical (crash kills fleet)
337    pub fn critical(mut self, critical: bool) -> Self {
338        self.critical = critical;
339        self
340    }
341
342    /// Set whether this is a oneshot job
343    pub fn oneshot(mut self, oneshot: bool) -> Self {
344        self.oneshot = oneshot;
345        self
346    }
347
348    /// Build the Ship
349    pub fn build(self) -> Ship {
350        let backoff_delay = Duration::from_secs_f64(RESTART_BACKOFF_SECS);
351        let name = self.name;
352        let breaker = CircuitBreaker::builder(format!("ship:{}", name))
353            .failure_threshold(RESTART_FAILURE_THRESHOLD)
354            .failure_window_secs(RESTART_FAILURE_WINDOW_SECS)
355            .half_open_timeout_secs(RESTART_BACKOFF_SECS)
356            .success_threshold(RESTART_SUCCESS_THRESHOLD)
357            .build();
358
359        Ship {
360            name,
361            group: self.group,
362            command: self.command,
363            args: self.args,
364            env: self.env,
365            bind: self.bind,
366            healthcheck: self.healthcheck,
367            depends_on: self.depends_on,
368            routes: self.routes,
369            critical: self.critical,
370            oneshot: self.oneshot,
371            state: Arc::new(Mutex::new(ShipState::new())),
372            child: Arc::new(Mutex::new(None)),
373            process_meta: Arc::new(Mutex::new(ProcessMeta::default())),
374            last_health_check: Arc::new(Mutex::new(None)),
375            restart_count: Arc::new(Mutex::new(0)),
376            log_buffer: Arc::new(Mutex::new(LogBuffer::new())),
377            breaker: Arc::new(Mutex::new(breaker)),
378            backoff_delay,
379        }
380    }
381}
382
383/// Process metadata for proper cleanup
384#[derive(Debug, Default)]
385struct ProcessMeta {
386    /// Process group ID (same as child PID when using setpgid(0,0))
387    pgid: Option<i32>,
388    /// Time of last launch (for grace period)
389    launched_at: Option<Instant>,
390}
391
392/// A ship in the fleet (process wrapper)
393pub struct Ship {
394    /// Ship name (unique identifier)
395    pub name: String,
396    /// Fleet group this ship belongs to
397    pub group: String,
398    /// Command to run
399    pub command: String,
400    /// Command arguments
401    pub args: Vec<String>,
402    /// Environment variables
403    pub env: HashMap<String, String>,
404    /// Bind address (for health check inference)
405    pub bind: Option<Bind>,
406    /// Health check endpoint
407    pub healthcheck: Option<String>,
408    /// Dependencies (other ship names)
409    pub depends_on: Vec<String>,
410    /// HTTP routes (bind name -> pattern)
411    pub routes: Vec<RouteConfig>,
412    /// Critical ship - crash kills fleet (default: true)
413    pub critical: bool,
414    /// Oneshot job - runs once then exits (default: false)
415    pub oneshot: bool,
416    /// Current lifecycle state
417    state: Arc<Mutex<ShipState>>,
418    /// Child process handle
419    child: Arc<Mutex<Option<Child>>>,
420    /// Process metadata (pgid, launch time)
421    process_meta: Arc<Mutex<ProcessMeta>>,
422    /// Last health check time
423    last_health_check: Arc<Mutex<Option<Instant>>>,
424    /// Restart count
425    restart_count: Arc<Mutex<u32>>,
426    /// Log buffer for TUI display (bounded by bytes and lines)
427    log_buffer: Arc<Mutex<LogBuffer>>,
428    /// Crash loop circuit breaker
429    breaker: Arc<Mutex<CircuitBreaker>>,
430    /// Backoff delay when circuit opens
431    backoff_delay: Duration,
432}
433
434impl Ship {
435    /// Get display name
436    pub fn display_name(&self) -> &str {
437        &self.name
438    }
439
440    async fn transition(&self, event: ShipLifecycleEvent) {
441        let mut state = self.state.lock().await;
442        if state.machine.handle(event).is_ok() {
443            state.refresh_status();
444        }
445    }
446
447    /// Get current status
448    pub async fn status(&self) -> ShipStatus {
449        self.state.lock().await.status
450    }
451
452    /// Get restart count
453    pub async fn restart_count(&self) -> u32 {
454        *self.restart_count.lock().await
455    }
456
457    /// Increment restart count
458    pub async fn increment_restart(&self) {
459        let mut count = self.restart_count.lock().await;
460        *count += 1;
461    }
462
463    /// Launch the ship (spawn process)
464    pub async fn launch(&self) -> anyhow::Result<()> {
465        let name = self.display_name();
466
467        // Guard against double launch
468        if self.is_running().await {
469            warn!(ship = name, "Ship already running, skipping launch");
470            return Ok(());
471        }
472
473        info!(ship = name, command = %self.command, "Launching ship");
474
475        self.transition(ShipLifecycleEvent::Start).await;
476
477        let mut cmd = Command::new(&self.command);
478
479        // Inject Mothership protocol env vars for ships that need them
480        let mothership_pid = std::process::id();
481        let socket_dir = std::env::var("MS_SOCKET_DIR").unwrap_or_else(|_| {
482            std::env::var("XDG_RUNTIME_DIR")
483                .unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().to_string())
484                + "/mothership"
485        });
486
487        cmd.args(&self.args)
488            .envs(&self.env)
489            .env("NO_COLOR", "1")
490            .env("MS_PID", mothership_pid.to_string())
491            .env("MS_SHIP", &self.name)
492            .env("MS_SOCKET_DIR", socket_dir)
493            .stdout(Stdio::piped())
494            .stderr(Stdio::piped())
495            .kill_on_drop(true);
496
497        // Set up process group and death signal
498        // SAFETY: pre_exec runs after fork, before exec. We only call
499        // async-signal-safe functions (setpgid, prctl, getppid, _exit).
500        unsafe {
501            cmd.pre_exec(|| {
502                // Create new process group with this process as leader
503                // This allows us to kill all children with killpg()
504                if libc::setpgid(0, 0) != 0 {
505                    // Non-fatal: continue even if setpgid fails
506                    // (can happen if already process group leader)
507                }
508
509                // Linux: request SIGTERM when parent dies
510                #[cfg(target_os = "linux")]
511                {
512                    if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM) != 0 {
513                        // Non-fatal: some environments don't allow this
514                    }
515                    // Check if parent already died (race condition)
516                    if libc::getppid() == 1 {
517                        libc::_exit(1);
518                    }
519                }
520
521                Ok(())
522            });
523        }
524
525        let mut child = cmd.spawn()?;
526        let pid = child.id();
527
528        // Spawn log forwarders - raw passthrough, no transformation
529        if let Some(stdout) = child.stdout.take() {
530            let log_buffer = self.log_buffer.clone();
531            tokio::spawn(async move {
532                let reader = BufReader::new(stdout);
533                let mut lines = reader.lines();
534                while let Ok(Some(line)) = lines.next_line().await {
535                    if !SUPPRESS_STDOUT.load(Ordering::SeqCst) {
536                        println!("{}", line);
537                    }
538                    log_buffer.lock().await.push("stdout", line);
539                }
540            });
541        }
542
543        if let Some(stderr) = child.stderr.take() {
544            let log_buffer = self.log_buffer.clone();
545            tokio::spawn(async move {
546                let reader = BufReader::new(stderr);
547                let mut lines = reader.lines();
548                while let Ok(Some(line)) = lines.next_line().await {
549                    if !SUPPRESS_STDOUT.load(Ordering::SeqCst) {
550                        eprintln!("{}", line);
551                    }
552                    log_buffer.lock().await.push("stderr", line);
553                }
554            });
555        }
556
557        // Store child and metadata
558        *self.child.lock().await = Some(child);
559        {
560            let mut meta = self.process_meta.lock().await;
561            meta.pgid = pid.map(|p| p as i32);
562            meta.launched_at = Some(Instant::now());
563        }
564
565        if let Some(p) = pid {
566            info!(ship = name, pid = p, "Ship launched");
567        } else {
568            info!(ship = name, "Ship launched");
569        }
570
571        Ok(())
572    }
573
574    /// Check if ship is within startup grace period
575    pub async fn in_grace_period(&self) -> bool {
576        let meta = self.process_meta.lock().await;
577        match meta.launched_at {
578            Some(launched_at) => launched_at.elapsed() < Duration::from_secs(STARTUP_GRACE_SECS),
579            None => false,
580        }
581    }
582
583    /// Check if process is still running
584    pub async fn is_running(&self) -> bool {
585        let mut child_guard = self.child.lock().await;
586        if let Some(child) = child_guard.as_mut() {
587            match child.try_wait() {
588                Ok(Some(_)) => false,
589                Ok(None) => true,
590                Err(_) => false,
591            }
592        } else {
593            false
594        }
595    }
596
597    /// Wait for process to exit
598    pub async fn wait(&self) -> anyhow::Result<i32> {
599        let mut child_guard = self.child.lock().await;
600        if let Some(child) = child_guard.as_mut() {
601            let status = child.wait().await?;
602            let code = status.code().unwrap_or(-1);
603
604            if code == 0 {
605                self.transition(ShipLifecycleEvent::Stop).await;
606            } else {
607                self.transition(ShipLifecycleEvent::Crash).await;
608            }
609
610            Ok(code)
611        } else {
612            Ok(-1)
613        }
614    }
615
616    /// Terminate the process and its entire process group
617    pub async fn terminate(&self) -> anyhow::Result<()> {
618        let name = self.display_name();
619        info!(ship = name, "Terminating ship");
620
621        let pgid = self.process_meta.lock().await.pgid;
622        let mut child_guard = self.child.lock().await;
623
624        if let Some(child) = child_guard.as_mut() {
625            use nix::sys::signal::{self, Signal};
626            use nix::unistd::Pid;
627
628            // Try graceful shutdown first (SIGTERM to process group)
629            if let Some(pgid) = pgid {
630                debug!(ship = name, pgid = pgid, "Sending SIGTERM to process group");
631                if signal::killpg(Pid::from_raw(pgid), Signal::SIGTERM).is_err()
632                    && let Some(pid) = child.id()
633                {
634                    let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
635                }
636            } else if let Some(pid) = child.id() {
637                let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
638            }
639
640            // Wait briefly for graceful shutdown
641            let timeout = Duration::from_secs(TERMINATE_TIMEOUT_SECS);
642            tokio::select! {
643                _ = tokio::time::sleep(timeout) => {
644                    // Force kill process group if still running
645                    if let Some(pgid) = pgid {
646                        debug!(ship = name, pgid = pgid, "Sending SIGKILL to process group");
647                        if signal::killpg(Pid::from_raw(pgid), Signal::SIGKILL).is_err()
648                            && let Some(pid) = child.id() {
649                                let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL);
650                            }
651                    }
652                    let _ = child.kill().await;
653                }
654                _ = child.wait() => {
655                    debug!(ship = name, "Ship exited gracefully");
656                }
657            }
658        }
659
660        // Clear process metadata
661        {
662            let mut meta = self.process_meta.lock().await;
663            meta.pgid = None;
664            meta.launched_at = None;
665        }
666
667        self.transition(ShipLifecycleEvent::Stop).await;
668        Ok(())
669    }
670
671    /// Get PID if running
672    pub async fn pid(&self) -> Option<u32> {
673        let child_guard = self.child.lock().await;
674        if let Some(child) = child_guard.as_ref() {
675            child.id()
676        } else {
677            None
678        }
679    }
680
681    /// Get healthcheck URL if configured
682    pub fn healthcheck_url(&self) -> Option<String> {
683        let endpoint = self.healthcheck.as_ref()?;
684
685        // If endpoint is already a full URL, use it directly
686        if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
687            return Some(endpoint.clone());
688        }
689
690        let base = self.bind.as_ref()?.healthcheck_base();
691        Some(format!("{}{}", base, endpoint))
692    }
693
694    /// Mark ship as healthy
695    pub async fn mark_healthy(&self) {
696        {
697            let mut state = self.state.lock().await;
698            if state.machine.handle(ShipLifecycleEvent::Healthy).is_ok() {
699                state.refresh_status();
700            }
701        }
702        *self.last_health_check.lock().await = Some(Instant::now());
703    }
704
705    /// Mark ship as unhealthy
706    pub async fn mark_unhealthy(&self) {
707        self.transition(ShipLifecycleEvent::Unhealthy).await;
708    }
709
710    /// Mark ship as failed (crash)
711    pub async fn mark_failed(&self) {
712        self.transition(ShipLifecycleEvent::Crash).await;
713    }
714
715    /// Record a healthy check (for rate-based thresholds)
716    pub async fn record_health_success(&self) {
717        let breaker = self.breaker.lock().await;
718        breaker.record_success(0.0);
719    }
720
721    /// Record an unhealthy check, returns true if circuit is open
722    pub async fn record_health_failure(&self) -> bool {
723        let mut breaker = self.breaker.lock().await;
724        breaker.record_failure_and_maybe_trip(0.0);
725        breaker.is_open()
726    }
727
728    /// Record a crash against the restart circuit, returns true if circuit is open
729    pub async fn record_crash(&self) -> bool {
730        let mut breaker = self.breaker.lock().await;
731        breaker.record_failure_and_maybe_trip(0.0);
732        breaker.is_open()
733    }
734
735    /// Reset the restart circuit (clears failure history)
736    pub async fn reset_breaker(&self) {
737        let mut breaker = self.breaker.lock().await;
738        breaker.reset();
739    }
740
741    /// Enter restart backoff state
742    pub async fn enter_backoff(&self) {
743        let until = Instant::now() + self.backoff_delay;
744        let mut state = self.state.lock().await;
745        if state.machine.handle(ShipLifecycleEvent::Backoff).is_ok() {
746            state.refresh_status();
747        }
748        state.backoff_until = Some(until);
749    }
750
751    /// Clear restart backoff state
752    pub async fn clear_backoff(&self) {
753        let mut state = self.state.lock().await;
754        state.backoff_until = None;
755    }
756
757    /// Check if ship is currently backing off
758    pub async fn is_backing_off(&self) -> bool {
759        self.state.lock().await.status == ShipStatus::Backoff
760    }
761
762    /// Check if backoff delay has elapsed
763    pub async fn backoff_expired(&self) -> bool {
764        let state = self.state.lock().await;
765        match state.backoff_until {
766            Some(until) => Instant::now() >= until,
767            None => true,
768        }
769    }
770
771    /// Get recent logs
772    pub async fn logs(&self, limit: usize) -> Vec<LogEntry> {
773        self.log_buffer.lock().await.recent(limit)
774    }
775
776    /// Take a snapshot of current state
777    pub async fn snapshot(&self) -> ShipSnapshot {
778        ShipSnapshot {
779            name: self.name.clone(),
780            group: self.group.clone(),
781            command: self.command.clone(),
782            status: self.status().await,
783            pid: self.pid().await,
784            healthcheck: self.healthcheck_url(),
785            restart_count: *self.restart_count.lock().await,
786            critical: self.critical,
787            oneshot: self.oneshot,
788            routes: self.routes.iter().map(|r| r.to_string()).collect(),
789        }
790    }
791}
792
793#[cfg(all(test, unix))]
794mod tests {
795    use std::{collections::HashMap, fs, path::Path, time::Duration};
796
797    use tempfile::tempdir;
798    use tokio::time::{Instant, sleep};
799
800    use super::ShipBuilder;
801
802    async fn wait_for_pid_file(path: &Path) -> i32 {
803        let deadline = Instant::now() + Duration::from_secs(2);
804        loop {
805            if let Ok(contents) = fs::read_to_string(path)
806                && let Ok(pid) = contents.trim().parse::<i32>()
807                && pid > 0
808            {
809                return pid;
810            }
811            if Instant::now() >= deadline {
812                panic!("timed out waiting for pid file: {}", path.display());
813            }
814            sleep(Duration::from_millis(20)).await;
815        }
816    }
817
818    fn process_exists(pid: i32) -> bool {
819        let result = unsafe { libc::kill(pid, 0) };
820        if result == 0 {
821            return true;
822        }
823        std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
824    }
825
826    async fn wait_for_exit(pid: i32) {
827        let deadline = Instant::now() + Duration::from_secs(2);
828        loop {
829            if !process_exists(pid) {
830                return;
831            }
832            if Instant::now() >= deadline {
833                panic!("process {} still running after shutdown", pid);
834            }
835            sleep(Duration::from_millis(20)).await;
836        }
837    }
838
839    #[tokio::test]
840    async fn terminate_kills_process_group() {
841        let temp = tempdir().expect("temp dir");
842        let pid_path = temp.path().join("ship-child.pid");
843
844        let mut env = HashMap::new();
845        env.insert(
846            "PID_FILE".to_string(),
847            pid_path.to_string_lossy().to_string(),
848        );
849
850        let script = "sleep 1000 & echo $! > \"$PID_FILE\"; wait";
851        let ship = ShipBuilder::new("test-ship", "sh")
852            .args(vec!["-c".to_string(), script.to_string()])
853            .env(env)
854            .critical(false)
855            .build();
856
857        ship.launch().await.expect("launch ship");
858
859        let child_pid = wait_for_pid_file(&pid_path).await;
860        ship.terminate().await.expect("terminate ship");
861
862        assert!(!ship.is_running().await, "ship still running");
863        wait_for_exit(child_pid).await;
864    }
865}