1use std::collections::{HashMap, VecDeque};
6use std::process::Stdio;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::{Duration, Instant};
10
11use tokio::io::{AsyncBufReadExt, BufReader};
12use tokio::process::{Child, Command};
13use tokio::sync::Mutex;
14use tracing::{debug, info, warn};
15
16use breaker_machines::CircuitBreaker;
17use state_machines::state_machine;
18
19use crate::charter::{Bind, RouteConfig};
20
21const MAX_LOG_LINES: usize = 1000;
23const MAX_LOG_BYTES: usize = 1024 * 1024;
25const MAX_LINE_LENGTH: usize = 4096;
27
28const RESTART_FAILURE_THRESHOLD: usize = 5;
30const RESTART_FAILURE_WINDOW_SECS: f64 = 60.0;
31const RESTART_BACKOFF_SECS: f64 = 30.0;
32const RESTART_SUCCESS_THRESHOLD: usize = 1;
33
34const STARTUP_GRACE_SECS: u64 = 10;
36
37const TERMINATE_TIMEOUT_SECS: u64 = 5;
39
40static SUPPRESS_STDOUT: AtomicBool = AtomicBool::new(false);
42
43pub fn set_suppress_stdout(suppress: bool) {
45 SUPPRESS_STDOUT.store(suppress, Ordering::SeqCst);
46}
47
48pub fn is_stdout_suppressed() -> bool {
50 SUPPRESS_STDOUT.load(Ordering::SeqCst)
51}
52
53#[derive(Debug, Clone)]
55pub struct LogEntry {
56 pub stream: &'static str,
57 pub message: String,
58}
59
60#[derive(Debug)]
62struct LogBuffer {
63 entries: VecDeque<LogEntry>,
64 total_bytes: usize,
65}
66
67impl LogBuffer {
68 fn new() -> Self {
69 Self {
70 entries: VecDeque::with_capacity(MAX_LOG_LINES),
71 total_bytes: 0,
72 }
73 }
74
75 fn push(&mut self, stream: &'static str, mut message: String) {
76 if message.len() > MAX_LINE_LENGTH {
78 message.truncate(MAX_LINE_LENGTH - 3);
79 message.push_str("...");
80 }
81
82 let msg_len = message.len();
83
84 while self.total_bytes + msg_len > MAX_LOG_BYTES && !self.entries.is_empty() {
86 if let Some(old) = self.entries.pop_front() {
87 self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
88 }
89 }
90
91 while self.entries.len() >= MAX_LOG_LINES {
93 if let Some(old) = self.entries.pop_front() {
94 self.total_bytes = self.total_bytes.saturating_sub(old.message.len());
95 }
96 }
97
98 self.total_bytes += msg_len;
99 self.entries.push_back(LogEntry { stream, message });
100 }
101
102 fn recent(&self, limit: usize) -> Vec<LogEntry> {
103 self.entries.iter().rev().take(limit).cloned().collect()
104 }
105}
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
109pub enum ShipStatus {
110 #[default]
112 Pending,
113 Starting,
115 Running,
117 Unhealthy,
119 Backoff,
121 Stopped,
123 Failed,
125}
126
127state_machine! {
128 name: ShipLifecycle,
129 context: (),
130 dynamic: true,
131
132 initial: Pending,
133 states: [
134 Pending,
135 Starting,
136 Running,
137 Unhealthy,
138 Backoff,
139 Stopped,
140 Failed,
141 ],
142 events {
143 start {
144 transition: { from: [Pending, Stopped, Failed, Backoff], to: Starting }
145 }
146 healthy {
147 transition: { from: [Starting, Unhealthy], to: Running }
148 }
149 unhealthy {
150 transition: { from: [Starting, Running], to: Unhealthy }
151 }
152 crash {
153 transition: { from: [Starting, Running, Unhealthy], to: Failed }
154 }
155 backoff {
156 transition: { from: [Failed, Starting, Running, Unhealthy], to: Backoff }
157 }
158 stop {
159 transition: { from: [Pending, Starting, Running, Unhealthy, Backoff, Failed], to: Stopped }
160 }
161 }
162}
163
164#[derive(Debug)]
165struct ShipState {
166 machine: DynamicShipLifecycle,
167 status: ShipStatus,
168 backoff_until: Option<Instant>,
169}
170
171impl ShipState {
172 fn new() -> Self {
173 let machine = DynamicShipLifecycle::new(());
174 let status = status_from_state(machine.current_state());
175 Self {
176 machine,
177 status,
178 backoff_until: None,
179 }
180 }
181
182 fn refresh_status(&mut self) {
183 self.status = status_from_state(self.machine.current_state());
184 }
185}
186
187fn status_from_state(state: &str) -> ShipStatus {
188 match state {
189 "Pending" => ShipStatus::Pending,
190 "Starting" => ShipStatus::Starting,
191 "Running" => ShipStatus::Running,
192 "Unhealthy" => ShipStatus::Unhealthy,
193 "Backoff" => ShipStatus::Backoff,
194 "Stopped" => ShipStatus::Stopped,
195 "Failed" => ShipStatus::Failed,
196 _ => ShipStatus::Pending,
197 }
198}
199
200#[derive(Debug, Clone)]
202pub struct ShipSnapshot {
203 name: String,
204 group: String,
205 command: String,
206 status: ShipStatus,
207 pid: Option<u32>,
208 healthcheck: Option<String>,
209 restart_count: u32,
210 critical: bool,
211 oneshot: bool,
212 routes: Vec<String>,
213}
214
215impl ShipSnapshot {
216 pub fn name(&self) -> &str {
218 &self.name
219 }
220
221 pub fn group(&self) -> &str {
223 &self.group
224 }
225
226 pub fn command(&self) -> &str {
228 &self.command
229 }
230
231 pub fn status(&self) -> ShipStatus {
233 self.status
234 }
235
236 pub fn pid(&self) -> Option<u32> {
238 self.pid
239 }
240
241 pub fn healthcheck(&self) -> Option<&str> {
243 self.healthcheck.as_deref()
244 }
245
246 pub fn restart_count(&self) -> u32 {
248 self.restart_count
249 }
250
251 pub fn is_critical(&self) -> bool {
253 self.critical
254 }
255
256 pub fn is_oneshot(&self) -> bool {
258 self.oneshot
259 }
260
261 pub fn routes(&self) -> &[String] {
263 &self.routes
264 }
265}
266
267#[derive(Default)]
269pub struct ShipBuilder {
270 name: String,
271 group: String,
272 command: String,
273 args: Vec<String>,
274 env: HashMap<String, String>,
275 bind: Option<Bind>,
276 healthcheck: Option<String>,
277 depends_on: Vec<String>,
278 routes: Vec<RouteConfig>,
279 critical: bool,
280 oneshot: bool,
281}
282
283impl ShipBuilder {
284 pub fn new(name: impl Into<String>, command: impl Into<String>) -> Self {
286 Self {
287 name: name.into(),
288 command: command.into(),
289 critical: true, ..Default::default()
291 }
292 }
293
294 pub fn group(mut self, group: impl Into<String>) -> Self {
296 self.group = group.into();
297 self
298 }
299
300 pub fn args(mut self, args: Vec<String>) -> Self {
302 self.args = args;
303 self
304 }
305
306 pub fn env(mut self, env: HashMap<String, String>) -> Self {
308 self.env = env;
309 self
310 }
311
312 pub fn bind(mut self, bind: Option<Bind>) -> Self {
314 self.bind = bind;
315 self
316 }
317
318 pub fn healthcheck(mut self, healthcheck: Option<String>) -> Self {
320 self.healthcheck = healthcheck;
321 self
322 }
323
324 pub fn depends_on(mut self, depends_on: Vec<String>) -> Self {
326 self.depends_on = depends_on;
327 self
328 }
329
330 pub fn routes(mut self, routes: Vec<RouteConfig>) -> Self {
332 self.routes = routes;
333 self
334 }
335
336 pub fn critical(mut self, critical: bool) -> Self {
338 self.critical = critical;
339 self
340 }
341
342 pub fn oneshot(mut self, oneshot: bool) -> Self {
344 self.oneshot = oneshot;
345 self
346 }
347
348 pub fn build(self) -> Ship {
350 let backoff_delay = Duration::from_secs_f64(RESTART_BACKOFF_SECS);
351 let name = self.name;
352 let breaker = CircuitBreaker::builder(format!("ship:{}", name))
353 .failure_threshold(RESTART_FAILURE_THRESHOLD)
354 .failure_window_secs(RESTART_FAILURE_WINDOW_SECS)
355 .half_open_timeout_secs(RESTART_BACKOFF_SECS)
356 .success_threshold(RESTART_SUCCESS_THRESHOLD)
357 .build();
358
359 Ship {
360 name,
361 group: self.group,
362 command: self.command,
363 args: self.args,
364 env: self.env,
365 bind: self.bind,
366 healthcheck: self.healthcheck,
367 depends_on: self.depends_on,
368 routes: self.routes,
369 critical: self.critical,
370 oneshot: self.oneshot,
371 state: Arc::new(Mutex::new(ShipState::new())),
372 child: Arc::new(Mutex::new(None)),
373 process_meta: Arc::new(Mutex::new(ProcessMeta::default())),
374 last_health_check: Arc::new(Mutex::new(None)),
375 restart_count: Arc::new(Mutex::new(0)),
376 log_buffer: Arc::new(Mutex::new(LogBuffer::new())),
377 breaker: Arc::new(Mutex::new(breaker)),
378 backoff_delay,
379 }
380 }
381}
382
383#[derive(Debug, Default)]
385struct ProcessMeta {
386 pgid: Option<i32>,
388 launched_at: Option<Instant>,
390}
391
392pub struct Ship {
394 pub name: String,
396 pub group: String,
398 pub command: String,
400 pub args: Vec<String>,
402 pub env: HashMap<String, String>,
404 pub bind: Option<Bind>,
406 pub healthcheck: Option<String>,
408 pub depends_on: Vec<String>,
410 pub routes: Vec<RouteConfig>,
412 pub critical: bool,
414 pub oneshot: bool,
416 state: Arc<Mutex<ShipState>>,
418 child: Arc<Mutex<Option<Child>>>,
420 process_meta: Arc<Mutex<ProcessMeta>>,
422 last_health_check: Arc<Mutex<Option<Instant>>>,
424 restart_count: Arc<Mutex<u32>>,
426 log_buffer: Arc<Mutex<LogBuffer>>,
428 breaker: Arc<Mutex<CircuitBreaker>>,
430 backoff_delay: Duration,
432}
433
434impl Ship {
435 pub fn display_name(&self) -> &str {
437 &self.name
438 }
439
440 async fn transition(&self, event: ShipLifecycleEvent) {
441 let mut state = self.state.lock().await;
442 if state.machine.handle(event).is_ok() {
443 state.refresh_status();
444 }
445 }
446
447 pub async fn status(&self) -> ShipStatus {
449 self.state.lock().await.status
450 }
451
452 pub async fn restart_count(&self) -> u32 {
454 *self.restart_count.lock().await
455 }
456
457 pub async fn increment_restart(&self) {
459 let mut count = self.restart_count.lock().await;
460 *count += 1;
461 }
462
463 pub async fn launch(&self) -> anyhow::Result<()> {
465 let name = self.display_name();
466
467 if self.is_running().await {
469 warn!(ship = name, "Ship already running, skipping launch");
470 return Ok(());
471 }
472
473 info!(ship = name, command = %self.command, "Launching ship");
474
475 self.transition(ShipLifecycleEvent::Start).await;
476
477 let mut cmd = Command::new(&self.command);
478
479 let mothership_pid = std::process::id();
481 let socket_dir = std::env::var("MS_SOCKET_DIR").unwrap_or_else(|_| {
482 std::env::var("XDG_RUNTIME_DIR")
483 .unwrap_or_else(|_| std::env::temp_dir().to_string_lossy().to_string())
484 + "/mothership"
485 });
486
487 cmd.args(&self.args)
488 .envs(&self.env)
489 .env("NO_COLOR", "1")
490 .env("MS_PID", mothership_pid.to_string())
491 .env("MS_SHIP", &self.name)
492 .env("MS_SOCKET_DIR", socket_dir)
493 .stdout(Stdio::piped())
494 .stderr(Stdio::piped())
495 .kill_on_drop(true);
496
497 unsafe {
501 cmd.pre_exec(|| {
502 if libc::setpgid(0, 0) != 0 {
505 }
508
509 #[cfg(target_os = "linux")]
511 {
512 if libc::prctl(libc::PR_SET_PDEATHSIG, libc::SIGTERM) != 0 {
513 }
515 if libc::getppid() == 1 {
517 libc::_exit(1);
518 }
519 }
520
521 Ok(())
522 });
523 }
524
525 let mut child = cmd.spawn()?;
526 let pid = child.id();
527
528 if let Some(stdout) = child.stdout.take() {
530 let log_buffer = self.log_buffer.clone();
531 tokio::spawn(async move {
532 let reader = BufReader::new(stdout);
533 let mut lines = reader.lines();
534 while let Ok(Some(line)) = lines.next_line().await {
535 if !SUPPRESS_STDOUT.load(Ordering::SeqCst) {
536 println!("{}", line);
537 }
538 log_buffer.lock().await.push("stdout", line);
539 }
540 });
541 }
542
543 if let Some(stderr) = child.stderr.take() {
544 let log_buffer = self.log_buffer.clone();
545 tokio::spawn(async move {
546 let reader = BufReader::new(stderr);
547 let mut lines = reader.lines();
548 while let Ok(Some(line)) = lines.next_line().await {
549 if !SUPPRESS_STDOUT.load(Ordering::SeqCst) {
550 eprintln!("{}", line);
551 }
552 log_buffer.lock().await.push("stderr", line);
553 }
554 });
555 }
556
557 *self.child.lock().await = Some(child);
559 {
560 let mut meta = self.process_meta.lock().await;
561 meta.pgid = pid.map(|p| p as i32);
562 meta.launched_at = Some(Instant::now());
563 }
564
565 if let Some(p) = pid {
566 info!(ship = name, pid = p, "Ship launched");
567 } else {
568 info!(ship = name, "Ship launched");
569 }
570
571 Ok(())
572 }
573
574 pub async fn in_grace_period(&self) -> bool {
576 let meta = self.process_meta.lock().await;
577 match meta.launched_at {
578 Some(launched_at) => launched_at.elapsed() < Duration::from_secs(STARTUP_GRACE_SECS),
579 None => false,
580 }
581 }
582
583 pub async fn is_running(&self) -> bool {
585 let mut child_guard = self.child.lock().await;
586 if let Some(child) = child_guard.as_mut() {
587 match child.try_wait() {
588 Ok(Some(_)) => false,
589 Ok(None) => true,
590 Err(_) => false,
591 }
592 } else {
593 false
594 }
595 }
596
597 pub async fn wait(&self) -> anyhow::Result<i32> {
599 let mut child_guard = self.child.lock().await;
600 if let Some(child) = child_guard.as_mut() {
601 let status = child.wait().await?;
602 let code = status.code().unwrap_or(-1);
603
604 if code == 0 {
605 self.transition(ShipLifecycleEvent::Stop).await;
606 } else {
607 self.transition(ShipLifecycleEvent::Crash).await;
608 }
609
610 Ok(code)
611 } else {
612 Ok(-1)
613 }
614 }
615
616 pub async fn terminate(&self) -> anyhow::Result<()> {
618 let name = self.display_name();
619 info!(ship = name, "Terminating ship");
620
621 let pgid = self.process_meta.lock().await.pgid;
622 let mut child_guard = self.child.lock().await;
623
624 if let Some(child) = child_guard.as_mut() {
625 use nix::sys::signal::{self, Signal};
626 use nix::unistd::Pid;
627
628 if let Some(pgid) = pgid {
630 debug!(ship = name, pgid = pgid, "Sending SIGTERM to process group");
631 if signal::killpg(Pid::from_raw(pgid), Signal::SIGTERM).is_err()
632 && let Some(pid) = child.id()
633 {
634 let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
635 }
636 } else if let Some(pid) = child.id() {
637 let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGTERM);
638 }
639
640 let timeout = Duration::from_secs(TERMINATE_TIMEOUT_SECS);
642 tokio::select! {
643 _ = tokio::time::sleep(timeout) => {
644 if let Some(pgid) = pgid {
646 debug!(ship = name, pgid = pgid, "Sending SIGKILL to process group");
647 if signal::killpg(Pid::from_raw(pgid), Signal::SIGKILL).is_err()
648 && let Some(pid) = child.id() {
649 let _ = signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL);
650 }
651 }
652 let _ = child.kill().await;
653 }
654 _ = child.wait() => {
655 debug!(ship = name, "Ship exited gracefully");
656 }
657 }
658 }
659
660 {
662 let mut meta = self.process_meta.lock().await;
663 meta.pgid = None;
664 meta.launched_at = None;
665 }
666
667 self.transition(ShipLifecycleEvent::Stop).await;
668 Ok(())
669 }
670
671 pub async fn pid(&self) -> Option<u32> {
673 let child_guard = self.child.lock().await;
674 if let Some(child) = child_guard.as_ref() {
675 child.id()
676 } else {
677 None
678 }
679 }
680
681 pub fn healthcheck_url(&self) -> Option<String> {
683 let endpoint = self.healthcheck.as_ref()?;
684
685 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
687 return Some(endpoint.clone());
688 }
689
690 let base = self.bind.as_ref()?.healthcheck_base();
691 Some(format!("{}{}", base, endpoint))
692 }
693
694 pub async fn mark_healthy(&self) {
696 {
697 let mut state = self.state.lock().await;
698 if state.machine.handle(ShipLifecycleEvent::Healthy).is_ok() {
699 state.refresh_status();
700 }
701 }
702 *self.last_health_check.lock().await = Some(Instant::now());
703 }
704
705 pub async fn mark_unhealthy(&self) {
707 self.transition(ShipLifecycleEvent::Unhealthy).await;
708 }
709
710 pub async fn mark_failed(&self) {
712 self.transition(ShipLifecycleEvent::Crash).await;
713 }
714
715 pub async fn record_health_success(&self) {
717 let breaker = self.breaker.lock().await;
718 breaker.record_success(0.0);
719 }
720
721 pub async fn record_health_failure(&self) -> bool {
723 let mut breaker = self.breaker.lock().await;
724 breaker.record_failure_and_maybe_trip(0.0);
725 breaker.is_open()
726 }
727
728 pub async fn record_crash(&self) -> bool {
730 let mut breaker = self.breaker.lock().await;
731 breaker.record_failure_and_maybe_trip(0.0);
732 breaker.is_open()
733 }
734
735 pub async fn reset_breaker(&self) {
737 let mut breaker = self.breaker.lock().await;
738 breaker.reset();
739 }
740
741 pub async fn enter_backoff(&self) {
743 let until = Instant::now() + self.backoff_delay;
744 let mut state = self.state.lock().await;
745 if state.machine.handle(ShipLifecycleEvent::Backoff).is_ok() {
746 state.refresh_status();
747 }
748 state.backoff_until = Some(until);
749 }
750
751 pub async fn clear_backoff(&self) {
753 let mut state = self.state.lock().await;
754 state.backoff_until = None;
755 }
756
757 pub async fn is_backing_off(&self) -> bool {
759 self.state.lock().await.status == ShipStatus::Backoff
760 }
761
762 pub async fn backoff_expired(&self) -> bool {
764 let state = self.state.lock().await;
765 match state.backoff_until {
766 Some(until) => Instant::now() >= until,
767 None => true,
768 }
769 }
770
771 pub async fn logs(&self, limit: usize) -> Vec<LogEntry> {
773 self.log_buffer.lock().await.recent(limit)
774 }
775
776 pub async fn snapshot(&self) -> ShipSnapshot {
778 ShipSnapshot {
779 name: self.name.clone(),
780 group: self.group.clone(),
781 command: self.command.clone(),
782 status: self.status().await,
783 pid: self.pid().await,
784 healthcheck: self.healthcheck_url(),
785 restart_count: *self.restart_count.lock().await,
786 critical: self.critical,
787 oneshot: self.oneshot,
788 routes: self.routes.iter().map(|r| r.to_string()).collect(),
789 }
790 }
791}
792
793#[cfg(all(test, unix))]
794mod tests {
795 use std::{collections::HashMap, fs, path::Path, time::Duration};
796
797 use tempfile::tempdir;
798 use tokio::time::{Instant, sleep};
799
800 use super::ShipBuilder;
801
802 async fn wait_for_pid_file(path: &Path) -> i32 {
803 let deadline = Instant::now() + Duration::from_secs(2);
804 loop {
805 if let Ok(contents) = fs::read_to_string(path)
806 && let Ok(pid) = contents.trim().parse::<i32>()
807 && pid > 0
808 {
809 return pid;
810 }
811 if Instant::now() >= deadline {
812 panic!("timed out waiting for pid file: {}", path.display());
813 }
814 sleep(Duration::from_millis(20)).await;
815 }
816 }
817
818 fn process_exists(pid: i32) -> bool {
819 let result = unsafe { libc::kill(pid, 0) };
820 if result == 0 {
821 return true;
822 }
823 std::io::Error::last_os_error().raw_os_error() != Some(libc::ESRCH)
824 }
825
826 async fn wait_for_exit(pid: i32) {
827 let deadline = Instant::now() + Duration::from_secs(2);
828 loop {
829 if !process_exists(pid) {
830 return;
831 }
832 if Instant::now() >= deadline {
833 panic!("process {} still running after shutdown", pid);
834 }
835 sleep(Duration::from_millis(20)).await;
836 }
837 }
838
839 #[tokio::test]
840 async fn terminate_kills_process_group() {
841 let temp = tempdir().expect("temp dir");
842 let pid_path = temp.path().join("ship-child.pid");
843
844 let mut env = HashMap::new();
845 env.insert(
846 "PID_FILE".to_string(),
847 pid_path.to_string_lossy().to_string(),
848 );
849
850 let script = "sleep 1000 & echo $! > \"$PID_FILE\"; wait";
851 let ship = ShipBuilder::new("test-ship", "sh")
852 .args(vec!["-c".to_string(), script.to_string()])
853 .env(env)
854 .critical(false)
855 .build();
856
857 ship.launch().await.expect("launch ship");
858
859 let child_pid = wait_for_pid_file(&pid_path).await;
860 ship.terminate().await.expect("terminate ship");
861
862 assert!(!ship.is_running().await, "ship still running");
863 wait_for_exit(child_pid).await;
864 }
865}