1use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::fmt;
9use std::fs::{self, OpenOptions};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::thread;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16pub fn version() -> &'static str {
18 env!("CARGO_PKG_VERSION")
19}
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "lowercase")]
24pub enum ServerStatus {
25 Running,
26 Stopped,
27}
28
29impl fmt::Display for ServerStatus {
30 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31 match self {
32 ServerStatus::Running => write!(f, "running"),
33 ServerStatus::Stopped => write!(f, "stopped"),
34 }
35 }
36}
37
38#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum StartOutcome {
41 Started,
42 AlreadyRunning,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum StopOutcome {
48 Stopped,
49 AlreadyStopped,
50}
51
52#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct ProcessSpec {
55 pub command: String,
56 pub args: Vec<String>,
57 pub env: BTreeMap<String, String>,
58 pub auto_restart: Option<AutoRestartPolicy>,
59 #[serde(default)]
60 pub max_memory_bytes: Option<u64>,
61 #[serde(default)]
62 pub max_file_descriptors: Option<u64>,
63}
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
67pub struct AutoRestartPolicy {
68 pub enabled: bool,
69 pub max_restarts: u32,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73struct RuntimeState {
74 status: ServerStatus,
75 updated_at_epoch_secs: u64,
76 #[serde(default)]
77 pid: Option<u32>,
78 #[serde(default)]
79 command: Option<String>,
80 #[serde(default)]
81 args: Vec<String>,
82 #[serde(default)]
83 auto_restart_enabled: bool,
84 #[serde(default)]
85 max_restarts: u32,
86 #[serde(default)]
87 restart_attempts: u32,
88}
89
90#[derive(Debug, Default, Deserialize)]
91struct RuntimePolicyFile {
92 #[serde(default)]
93 servers: RuntimePolicyServers,
94}
95
96#[derive(Debug, Default, Deserialize)]
97struct RuntimePolicyServers {
98 #[serde(default)]
99 deny: Vec<String>,
100}
101
102#[derive(Debug, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct AuditEvent {
105 timestamp_epoch_secs: u64,
106 server: String,
107 action: String,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pid: Option<u32>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 command: Option<String>,
112 #[serde(skip_serializing_if = "Option::is_none")]
113 args: Option<Vec<String>>,
114}
115
116impl Default for RuntimeState {
117 fn default() -> Self {
118 RuntimeState {
119 status: ServerStatus::Stopped,
120 updated_at_epoch_secs: now_epoch_secs(),
121 pid: None,
122 command: None,
123 args: Vec::new(),
124 auto_restart_enabled: false,
125 max_restarts: 0,
126 restart_attempts: 0,
127 }
128 }
129}
130
131pub struct RuntimeManager {
132 berth_home: PathBuf,
133}
134
135impl RuntimeManager {
136 pub fn new<P: Into<PathBuf>>(berth_home: P) -> Self {
138 RuntimeManager {
139 berth_home: berth_home.into(),
140 }
141 }
142
143 pub fn status(&self, server: &str) -> io::Result<ServerStatus> {
145 self.status_with_spec(server, None)
146 }
147
148 pub fn status_with_spec(
150 &self,
151 server: &str,
152 spec: Option<&ProcessSpec>,
153 ) -> io::Result<ServerStatus> {
154 let mut state = self.read_state(server)?;
155
156 if state.status == ServerStatus::Running {
157 let old_pid = state.pid;
158 let old_command = state.command.clone();
159 let old_args = state.args.clone();
160
161 if let Some(pid) = state.pid {
162 if process_is_alive(pid) {
163 return Ok(ServerStatus::Running);
164 }
165 }
166
167 let expects_external_supervisor = spec
168 .and_then(|s| s.auto_restart)
169 .is_some_and(|policy| policy.enabled)
170 && !state.auto_restart_enabled;
171 if expects_external_supervisor
172 && self.wait_for_supervisor_replacement(server, old_pid)?
173 {
174 return Ok(ServerStatus::Running);
175 }
176
177 state.status = ServerStatus::Stopped;
179 state.pid = None;
180 state.updated_at_epoch_secs = now_epoch_secs();
181 self.write_state(server, &state)?;
182 self.append_log(server, "EXIT")?;
183 self.append_audit_event(AuditEvent {
184 timestamp_epoch_secs: now_epoch_secs(),
185 server: server.to_string(),
186 action: "exit".to_string(),
187 pid: old_pid,
188 command: old_command.clone(),
189 args: if old_args.is_empty() {
190 None
191 } else {
192 Some(old_args.clone())
193 },
194 })?;
195
196 if state.auto_restart_enabled && state.restart_attempts < state.max_restarts {
198 if let Some(spec) = spec {
199 if self.server_denied_by_policy(server)? {
200 state.status = ServerStatus::Stopped;
201 state.pid = None;
202 state.updated_at_epoch_secs = now_epoch_secs();
203 self.write_state(server, &state)?;
204 self.append_log(server, "POLICY_DENIED_AUTO_RESTART")?;
205 self.append_audit_event(AuditEvent {
206 timestamp_epoch_secs: now_epoch_secs(),
207 server: server.to_string(),
208 action: "policy-denied".to_string(),
209 pid: old_pid,
210 command: old_command,
211 args: if old_args.is_empty() {
212 None
213 } else {
214 Some(old_args)
215 },
216 })?;
217 return Ok(ServerStatus::Stopped);
218 }
219
220 let mut cmd = Command::new(&spec.command);
221 cmd.args(&spec.args)
222 .envs(&spec.env)
223 .stdin(Stdio::null())
224 .stdout(Stdio::from(self.open_log_append(server)?))
225 .stderr(Stdio::from(self.open_log_append(server)?));
226 apply_resource_limits(
227 &mut cmd,
228 spec.max_memory_bytes,
229 spec.max_file_descriptors,
230 );
231 let child = cmd.spawn().map_err(|e| {
232 io::Error::new(e.kind(), format!("failed to spawn process: {e}"))
233 })?;
234 let pid = child.id();
235 drop(child);
236
237 state.status = ServerStatus::Running;
238 state.pid = Some(pid);
239 state.command = Some(spec.command.clone());
240 state.args = spec.args.clone();
241 state.restart_attempts += 1;
242 state.updated_at_epoch_secs = now_epoch_secs();
243 self.write_state(server, &state)?;
244 self.append_log(
245 server,
246 &format!(
247 "AUTO_RESTART pid={pid} attempt={}/{}",
248 state.restart_attempts, state.max_restarts
249 ),
250 )?;
251 self.append_audit_event(AuditEvent {
252 timestamp_epoch_secs: now_epoch_secs(),
253 server: server.to_string(),
254 action: "auto-restart".to_string(),
255 pid: Some(pid),
256 command: Some(spec.command.clone()),
257 args: if spec.args.is_empty() {
258 None
259 } else {
260 Some(spec.args.clone())
261 },
262 })?;
263 return Ok(ServerStatus::Running);
264 }
265 }
266 }
267
268 Ok(ServerStatus::Stopped)
269 }
270
271 fn wait_for_supervisor_replacement(
273 &self,
274 server: &str,
275 old_pid: Option<u32>,
276 ) -> io::Result<bool> {
277 for _ in 0..10 {
278 thread::sleep(Duration::from_millis(50));
279 let state = self.read_state(server)?;
280 if state.status != ServerStatus::Running {
281 return Ok(false);
282 }
283 if let Some(pid) = state.pid {
284 if Some(pid) != old_pid && process_is_alive(pid) {
285 return Ok(true);
286 }
287 }
288 }
289 Ok(false)
290 }
291
292 pub fn start(&self, server: &str, spec: &ProcessSpec) -> io::Result<StartOutcome> {
294 if spec.command.trim().is_empty() {
295 return Err(io::Error::new(
296 io::ErrorKind::InvalidInput,
297 "process command must not be empty",
298 ));
299 }
300
301 let mut state = self.read_state(server)?;
302 if let Some(pid) = state.pid {
303 if process_is_alive(pid) {
304 state.status = ServerStatus::Running;
305 state.updated_at_epoch_secs = now_epoch_secs();
306 self.write_state(server, &state)?;
307 return Ok(StartOutcome::AlreadyRunning);
308 }
309
310 state.status = ServerStatus::Stopped;
311 state.pid = None;
312 }
313
314 fs::create_dir_all(self.logs_dir())?;
315 let log_file = self.open_log_append(server)?;
316 let err_file = log_file.try_clone()?;
317
318 let mut cmd = Command::new(&spec.command);
319 cmd.args(&spec.args)
320 .envs(&spec.env)
321 .stdin(Stdio::null())
322 .stdout(Stdio::from(log_file))
323 .stderr(Stdio::from(err_file));
324 apply_resource_limits(&mut cmd, spec.max_memory_bytes, spec.max_file_descriptors);
325 let child = cmd
326 .spawn()
327 .map_err(|e| io::Error::new(e.kind(), format!("failed to spawn process: {e}")))?;
328 let pid = child.id();
329 drop(child);
330
331 state.status = ServerStatus::Running;
332 state.pid = Some(pid);
333 state.command = Some(spec.command.clone());
334 state.args = spec.args.clone();
335 state.auto_restart_enabled = spec.auto_restart.map(|p| p.enabled).unwrap_or(false);
336 state.max_restarts = spec.auto_restart.map(|p| p.max_restarts).unwrap_or(0);
337 state.restart_attempts = 0;
338 state.updated_at_epoch_secs = now_epoch_secs();
339 self.write_state(server, &state)?;
340 self.append_log(server, &format!("START pid={pid}"))?;
341 self.append_audit_event(AuditEvent {
342 timestamp_epoch_secs: now_epoch_secs(),
343 server: server.to_string(),
344 action: "start".to_string(),
345 pid: Some(pid),
346 command: Some(spec.command.clone()),
347 args: if spec.args.is_empty() {
348 None
349 } else {
350 Some(spec.args.clone())
351 },
352 })?;
353 Ok(StartOutcome::Started)
354 }
355
356 pub fn stop(&self, server: &str) -> io::Result<StopOutcome> {
358 let mut state = self.read_state(server)?;
359 let old_pid = state.pid;
360 let old_command = state.command.clone();
361 let old_args = state.args.clone();
362 let mut outcome = StopOutcome::AlreadyStopped;
363 let pid_to_stop = state.pid.filter(|pid| process_is_alive(*pid));
364
365 if pid_to_stop.is_some() || state.status == ServerStatus::Running {
366 outcome = StopOutcome::Stopped;
367 }
368
369 state.status = ServerStatus::Stopped;
371 state.pid = None;
372 state.restart_attempts = 0;
373 state.updated_at_epoch_secs = now_epoch_secs();
374 self.write_state(server, &state)?;
375 self.append_log(server, "STOP")?;
376
377 if let Some(pid) = pid_to_stop {
378 terminate_process(pid)?;
379 }
380
381 for _ in 0..5 {
383 let latest = self.read_state(server)?;
384 let Some(pid) = latest.pid else {
385 break;
386 };
387 if !process_is_alive(pid) {
388 break;
389 }
390 terminate_process(pid)?;
391 let mut reset = latest;
392 reset.status = ServerStatus::Stopped;
393 reset.pid = None;
394 reset.restart_attempts = 0;
395 reset.updated_at_epoch_secs = now_epoch_secs();
396 self.write_state(server, &reset)?;
397 thread::sleep(Duration::from_millis(20));
398 }
399
400 if outcome == StopOutcome::Stopped {
401 self.append_audit_event(AuditEvent {
402 timestamp_epoch_secs: now_epoch_secs(),
403 server: server.to_string(),
404 action: "stop".to_string(),
405 pid: old_pid,
406 command: old_command,
407 args: if old_args.is_empty() {
408 None
409 } else {
410 Some(old_args)
411 },
412 })?;
413 }
414 Ok(outcome)
415 }
416
417 pub fn restart(&self, server: &str, spec: &ProcessSpec) -> io::Result<()> {
419 let _ = self.stop(server)?;
420 let _ = self.start(server, spec)?;
421 let state = self.read_state(server)?;
422 self.append_audit_event(AuditEvent {
423 timestamp_epoch_secs: now_epoch_secs(),
424 server: server.to_string(),
425 action: "restart".to_string(),
426 pid: state.pid,
427 command: state.command,
428 args: if state.args.is_empty() {
429 None
430 } else {
431 Some(state.args)
432 },
433 })?;
434 Ok(())
435 }
436
437 pub fn run_supervisor(&self, server: &str, spec: &ProcessSpec) -> io::Result<()> {
439 let policy = match spec.auto_restart {
440 Some(policy) if policy.enabled => policy,
441 _ => return Ok(()),
442 };
443
444 let runtime = tokio::runtime::Builder::new_current_thread()
445 .enable_time()
446 .build()
447 .map_err(|e| io::Error::other(format!("failed to build tokio runtime: {e}")))?;
448
449 runtime.block_on(self.run_supervisor_loop(server, spec, policy))
450 }
451
452 async fn run_supervisor_loop(
454 &self,
455 server: &str,
456 spec: &ProcessSpec,
457 policy: AutoRestartPolicy,
458 ) -> io::Result<()> {
459 let poll_interval = Duration::from_millis(100);
460 let mut restart_attempts = self.read_state(server)?.restart_attempts;
461
462 loop {
463 let state = self.read_state(server)?;
464 if state.status != ServerStatus::Running {
465 return Ok(());
466 }
467
468 let monitored_pid = match state.pid {
469 Some(pid) => pid,
470 None => {
471 tokio::time::sleep(poll_interval).await;
472 continue;
473 }
474 };
475
476 loop {
477 if !process_is_alive(monitored_pid) {
478 break;
479 }
480 tokio::time::sleep(poll_interval).await;
481 let latest = self.read_state(server)?;
482 if latest.status != ServerStatus::Running {
483 return Ok(());
484 }
485 if latest.pid != Some(monitored_pid) {
486 return Ok(());
488 }
489 }
490
491 let state_after_exit = self.read_state(server)?;
492 if state_after_exit.status != ServerStatus::Running {
493 return Ok(());
494 }
495 if state_after_exit.pid != Some(monitored_pid) {
496 return Ok(());
497 }
498
499 self.append_log(server, "EXIT")?;
500 self.append_audit_event(AuditEvent {
501 timestamp_epoch_secs: now_epoch_secs(),
502 server: server.to_string(),
503 action: "exit".to_string(),
504 pid: Some(monitored_pid),
505 command: state_after_exit.command.clone(),
506 args: if state_after_exit.args.is_empty() {
507 None
508 } else {
509 Some(state_after_exit.args.clone())
510 },
511 })?;
512
513 if restart_attempts >= policy.max_restarts {
514 let mut stopped_state = state_after_exit;
515 stopped_state.status = ServerStatus::Stopped;
516 stopped_state.pid = None;
517 stopped_state.updated_at_epoch_secs = now_epoch_secs();
518 stopped_state.restart_attempts = restart_attempts;
519 self.write_state(server, &stopped_state)?;
520 return Ok(());
521 }
522
523 if self.server_denied_by_policy(server)? {
524 let mut stopped_state = self.read_state(server)?;
525 stopped_state.status = ServerStatus::Stopped;
526 stopped_state.pid = None;
527 stopped_state.updated_at_epoch_secs = now_epoch_secs();
528 self.write_state(server, &stopped_state)?;
529 self.append_log(server, "POLICY_DENIED_AUTO_RESTART")?;
530 self.append_audit_event(AuditEvent {
531 timestamp_epoch_secs: now_epoch_secs(),
532 server: server.to_string(),
533 action: "policy-denied".to_string(),
534 pid: Some(monitored_pid),
535 command: stopped_state.command.clone(),
536 args: if stopped_state.args.is_empty() {
537 None
538 } else {
539 Some(stopped_state.args.clone())
540 },
541 })?;
542 return Ok(());
543 }
544
545 let control_state = self.read_state(server)?;
546 if control_state.status != ServerStatus::Running
547 || control_state.pid != Some(monitored_pid)
548 {
549 return Ok(());
550 }
551
552 let log_file = self.open_log_append(server)?;
553 let err_file = log_file.try_clone()?;
554 let mut cmd = Command::new(&spec.command);
555 cmd.args(&spec.args)
556 .envs(&spec.env)
557 .stdin(Stdio::null())
558 .stdout(Stdio::from(log_file))
559 .stderr(Stdio::from(err_file));
560 apply_resource_limits(&mut cmd, spec.max_memory_bytes, spec.max_file_descriptors);
561 let child = cmd
562 .spawn()
563 .map_err(|e| io::Error::new(e.kind(), format!("failed to spawn process: {e}")))?;
564 let pid = child.id();
565 drop(child);
566
567 if self.read_state(server)?.status != ServerStatus::Running {
569 let _ = terminate_process(pid);
570 return Ok(());
571 }
572
573 restart_attempts += 1;
574 let mut restarted_state = self.read_state(server)?;
575 restarted_state.status = ServerStatus::Running;
576 restarted_state.pid = Some(pid);
577 restarted_state.command = Some(spec.command.clone());
578 restarted_state.args = spec.args.clone();
579 restarted_state.updated_at_epoch_secs = now_epoch_secs();
580 restarted_state.restart_attempts = restart_attempts;
581 self.write_state(server, &restarted_state)?;
582 self.append_log(
583 server,
584 &format!(
585 "AUTO_RESTART pid={pid} attempt={}/{}",
586 restart_attempts, policy.max_restarts
587 ),
588 )?;
589 self.append_audit_event(AuditEvent {
590 timestamp_epoch_secs: now_epoch_secs(),
591 server: server.to_string(),
592 action: "auto-restart".to_string(),
593 pid: Some(pid),
594 command: Some(spec.command.clone()),
595 args: if spec.args.is_empty() {
596 None
597 } else {
598 Some(spec.args.clone())
599 },
600 })?;
601 }
602 }
603
604 pub fn tail_logs(&self, server: &str, lines: usize) -> io::Result<Vec<String>> {
606 if lines == 0 {
607 return Ok(Vec::new());
608 }
609
610 let path = self.log_path(server);
611 if !path.exists() {
612 return Ok(Vec::new());
613 }
614
615 let content = fs::read_to_string(path)?;
616 let all: Vec<String> = content.lines().map(ToString::to_string).collect();
617 if all.len() <= lines {
618 return Ok(all);
619 }
620
621 Ok(all[all.len() - lines..].to_vec())
622 }
623
624 pub fn record_audit_event(
626 &self,
627 server: &str,
628 action: &str,
629 pid: Option<u32>,
630 command: Option<&str>,
631 args: Option<&[String]>,
632 ) -> io::Result<()> {
633 if action.trim().is_empty() {
634 return Err(io::Error::new(
635 io::ErrorKind::InvalidInput,
636 "audit action must not be empty",
637 ));
638 }
639
640 self.append_audit_event(AuditEvent {
641 timestamp_epoch_secs: now_epoch_secs(),
642 server: server.to_string(),
643 action: action.to_string(),
644 pid,
645 command: command.map(ToString::to_string),
646 args: args.filter(|v| !v.is_empty()).map(|v| v.to_vec()),
647 })
648 }
649
650 fn runtime_dir(&self) -> PathBuf {
652 self.berth_home.join("runtime")
653 }
654
655 fn logs_dir(&self) -> PathBuf {
657 self.berth_home.join("logs")
658 }
659
660 fn audit_dir(&self) -> PathBuf {
662 self.berth_home.join("audit")
663 }
664
665 fn state_path(&self, server: &str) -> PathBuf {
667 self.runtime_dir().join(format!("{server}.toml"))
668 }
669
670 fn log_path(&self, server: &str) -> PathBuf {
672 self.logs_dir().join(format!("{server}.log"))
673 }
674
675 fn audit_log_path(&self) -> PathBuf {
677 self.audit_dir().join("audit.jsonl")
678 }
679
680 fn policy_path(&self) -> PathBuf {
682 self.berth_home.join("policy.toml")
683 }
684
685 fn server_denied_by_policy(&self, server: &str) -> io::Result<bool> {
687 let policy_path = self.policy_path();
688 if !policy_path.exists() {
689 return Ok(false);
690 }
691
692 let content = fs::read_to_string(&policy_path)?;
693 let policy: RuntimePolicyFile = toml::from_str(&content).map_err(|e| {
694 io::Error::new(
695 io::ErrorKind::InvalidData,
696 format!("failed to parse policy file {}: {e}", policy_path.display()),
697 )
698 })?;
699
700 Ok(policy.servers.deny.iter().any(|entry| {
701 let normalized = entry.trim();
702 normalized == "*" || normalized.eq_ignore_ascii_case(server)
703 }))
704 }
705
706 fn read_state(&self, server: &str) -> io::Result<RuntimeState> {
708 let path = self.state_path(server);
709 if !path.exists() {
710 return Ok(RuntimeState::default());
711 }
712
713 let content = fs::read_to_string(path)?;
714 toml::from_str(&content).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
715 }
716
717 fn write_state(&self, server: &str, state: &RuntimeState) -> io::Result<()> {
719 fs::create_dir_all(self.runtime_dir())?;
720 let serialized = toml::to_string_pretty(state)
721 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
722 fs::write(self.state_path(server), serialized)
723 }
724
725 fn append_log(&self, server: &str, event: &str) -> io::Result<()> {
727 let mut file = self.open_log_append(server)?;
728 writeln!(file, "[{}] {}", now_epoch_secs(), event)
729 }
730
731 fn open_log_append(&self, server: &str) -> io::Result<std::fs::File> {
733 fs::create_dir_all(self.logs_dir())?;
734 OpenOptions::new()
735 .create(true)
736 .append(true)
737 .open(self.log_path(server))
738 }
739
740 fn append_audit_event(&self, event: AuditEvent) -> io::Result<()> {
742 fs::create_dir_all(self.audit_dir())?;
743 let json = serde_json::to_string(&event)
744 .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
745 let mut file = OpenOptions::new()
746 .create(true)
747 .append(true)
748 .open(self.audit_log_path())?;
749 writeln!(file, "{json}")
750 }
751}
752
753fn now_epoch_secs() -> u64 {
755 SystemTime::now()
756 .duration_since(UNIX_EPOCH)
757 .unwrap_or_default()
758 .as_secs()
759}
760
761#[cfg(unix)]
763fn apply_resource_limits(
764 cmd: &mut Command,
765 max_memory_bytes: Option<u64>,
766 max_file_descriptors: Option<u64>,
767) {
768 use std::os::unix::process::CommandExt;
769
770 if max_memory_bytes.is_none() && max_file_descriptors.is_none() {
771 return;
772 }
773
774 unsafe {
775 cmd.pre_exec(move || {
776 if let Some(bytes) = max_memory_bytes {
777 let rlim = libc::rlimit {
778 rlim_cur: bytes,
779 rlim_max: bytes,
780 };
781 if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
782 return Err(io::Error::last_os_error());
783 }
784 }
785 if let Some(n) = max_file_descriptors {
786 let rlim = libc::rlimit {
787 rlim_cur: n,
788 rlim_max: n,
789 };
790 if libc::setrlimit(libc::RLIMIT_NOFILE, &rlim) != 0 {
791 return Err(io::Error::last_os_error());
792 }
793 }
794 Ok(())
795 });
796 }
797}
798
799#[cfg(not(unix))]
801fn apply_resource_limits(
802 _cmd: &mut Command,
803 _max_memory_bytes: Option<u64>,
804 _max_file_descriptors: Option<u64>,
805) {
806}
807
808#[cfg(unix)]
810fn process_is_alive(pid: u32) -> bool {
811 let pid_str = pid.to_string();
812 if let Ok(out) = Command::new("ps")
813 .args(["-o", "stat=", "-p", &pid_str])
814 .output()
815 {
816 if !out.status.success() {
817 return false;
818 }
819 let stat = String::from_utf8_lossy(&out.stdout).trim().to_string();
820 if stat.is_empty() {
821 return false;
822 }
823 if stat.starts_with('Z') {
825 return false;
826 }
827 return true;
828 }
829
830 Command::new("kill")
831 .arg("-0")
832 .arg(&pid_str)
833 .status()
834 .is_ok_and(|s| s.success())
835}
836
837#[cfg(windows)]
839fn process_is_alive(pid: u32) -> bool {
840 let output = Command::new("cmd")
841 .args(["/C", "tasklist", "/FI", &format!("PID eq {pid}")])
842 .output();
843 match output {
844 Ok(out) if out.status.success() => {
845 String::from_utf8_lossy(&out.stdout).contains(&pid.to_string())
846 }
847 _ => false,
848 }
849}
850
851#[cfg(not(any(unix, windows)))]
853fn process_is_alive(_pid: u32) -> bool {
854 false
855}
856
857#[cfg(unix)]
859fn terminate_process(pid: u32) -> io::Result<()> {
860 let pid_str = pid.to_string();
861 let status = Command::new("kill").arg(&pid_str).status()?;
862 if !status.success() {
863 return Err(io::Error::new(
864 io::ErrorKind::PermissionDenied,
865 format!("failed to signal process {pid}"),
866 ));
867 }
868
869 if wait_for_process_exit(pid, 50, Duration::from_millis(20)) {
870 return Ok(());
871 }
872
873 let kill_status = Command::new("kill").args(["-9", &pid_str]).status()?;
875 if kill_status.success() {
876 Ok(())
877 } else {
878 Err(io::Error::new(
879 io::ErrorKind::PermissionDenied,
880 format!("failed to force terminate process {pid}"),
881 ))
882 }
883}
884
885#[cfg(windows)]
887fn terminate_process(pid: u32) -> io::Result<()> {
888 let status = Command::new("taskkill")
890 .args(["/PID", &pid.to_string(), "/T"])
891 .status()?;
892 if !status.success() {
893 return Err(io::Error::new(
894 io::ErrorKind::PermissionDenied,
895 format!("failed to terminate process {pid}"),
896 ));
897 }
898
899 if wait_for_process_exit(pid, 50, Duration::from_millis(20)) {
900 return Ok(());
901 }
902
903 let force_status = Command::new("taskkill")
904 .args(["/PID", &pid.to_string(), "/T", "/F"])
905 .status()?;
906 if force_status.success() {
907 Ok(())
908 } else {
909 Err(io::Error::new(
910 io::ErrorKind::PermissionDenied,
911 format!("failed to force terminate process {pid}"),
912 ))
913 }
914}
915
916#[cfg(not(any(unix, windows)))]
918fn terminate_process(_pid: u32) -> io::Result<()> {
919 Err(io::Error::new(
920 io::ErrorKind::Unsupported,
921 "process termination is not supported on this platform",
922 ))
923}
924
925fn wait_for_process_exit(pid: u32, attempts: u32, interval: Duration) -> bool {
927 for _ in 0..attempts {
928 if !process_is_alive(pid) {
929 return true;
930 }
931 thread::sleep(interval);
932 }
933 !process_is_alive(pid)
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939 use std::thread;
940 use std::time::Duration;
941
942 fn manager() -> (tempfile::TempDir, RuntimeManager) {
943 let tmp = tempfile::tempdir().unwrap();
944 let manager = RuntimeManager::new(tmp.path().join(".berth"));
945 (tmp, manager)
946 }
947
948 fn wait_until_process_exits(manager: &RuntimeManager, server: &str) {
949 for _ in 0..100 {
950 let state = manager.read_state(server).unwrap();
951 match state.pid {
952 Some(pid) if process_is_alive(pid) => thread::sleep(Duration::from_millis(20)),
953 _ => return,
954 }
955 }
956 }
957
958 fn write_policy_deny_server(tmp: &tempfile::TempDir, server: &str) {
959 fs::create_dir_all(tmp.path().join(".berth")).unwrap();
960 fs::write(
961 tmp.path().join(".berth").join("policy.toml"),
962 format!("[servers]\ndeny = [\"{server}\"]\n"),
963 )
964 .unwrap();
965 }
966
967 #[cfg(unix)]
968 fn long_running_spec() -> ProcessSpec {
969 ProcessSpec {
970 command: "sh".to_string(),
971 args: vec!["-c".to_string(), "sleep 60".to_string()],
972 env: BTreeMap::new(),
973 auto_restart: None,
974 ..Default::default()
975 }
976 }
977
978 #[cfg(windows)]
979 fn long_running_spec() -> ProcessSpec {
980 ProcessSpec {
981 command: "cmd".to_string(),
982 args: vec![
983 "/C".to_string(),
984 "timeout".to_string(),
985 "/T".to_string(),
986 "60".to_string(),
987 "/NOBREAK".to_string(),
988 ],
989 env: BTreeMap::new(),
990 auto_restart: None,
991 ..Default::default()
992 }
993 }
994
995 #[cfg(not(any(unix, windows)))]
996 fn long_running_spec() -> ProcessSpec {
997 ProcessSpec {
998 command: "unsupported".to_string(),
999 args: vec![],
1000 env: BTreeMap::new(),
1001 auto_restart: None,
1002 ..Default::default()
1003 }
1004 }
1005
1006 #[cfg(unix)]
1007 fn ignores_term_spec() -> ProcessSpec {
1008 ProcessSpec {
1009 command: "sh".to_string(),
1010 args: vec![
1011 "-c".to_string(),
1012 "trap '' TERM; while true; do sleep 1; done".to_string(),
1013 ],
1014 env: BTreeMap::new(),
1015 auto_restart: None,
1016 ..Default::default()
1017 }
1018 }
1019
1020 #[cfg(unix)]
1021 fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1022 ProcessSpec {
1023 command: "sh".to_string(),
1024 args: vec!["-c".to_string(), "exit 1".to_string()],
1025 env: BTreeMap::new(),
1026 auto_restart: Some(AutoRestartPolicy {
1027 enabled: true,
1028 max_restarts,
1029 }),
1030 ..Default::default()
1031 }
1032 }
1033
1034 #[cfg(windows)]
1035 fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1036 ProcessSpec {
1037 command: "cmd".to_string(),
1038 args: vec!["/C".to_string(), "exit /B 1".to_string()],
1039 env: BTreeMap::new(),
1040 auto_restart: Some(AutoRestartPolicy {
1041 enabled: true,
1042 max_restarts,
1043 }),
1044 ..Default::default()
1045 }
1046 }
1047
1048 #[cfg(not(any(unix, windows)))]
1049 fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1050 ProcessSpec {
1051 command: "unsupported".to_string(),
1052 args: vec![],
1053 env: BTreeMap::new(),
1054 auto_restart: Some(AutoRestartPolicy {
1055 enabled: true,
1056 max_restarts,
1057 }),
1058 ..Default::default()
1059 }
1060 }
1061
1062 #[cfg(unix)]
1063 fn fail_once_then_run_spec(marker_path: &str, max_restarts: u32) -> ProcessSpec {
1064 ProcessSpec {
1065 command: "sh".to_string(),
1066 args: vec![
1067 "-c".to_string(),
1068 format!(
1069 "if [ -f '{marker_path}' ]; then sleep 60; else touch '{marker_path}'; exit 1; fi"
1070 ),
1071 ],
1072 env: BTreeMap::new(),
1073 auto_restart: Some(AutoRestartPolicy {
1074 enabled: true,
1075 max_restarts,
1076 }),
1077 ..Default::default()
1078 }
1079 }
1080
1081 #[cfg(windows)]
1082 fn fail_once_then_run_spec(marker_path: &str, max_restarts: u32) -> ProcessSpec {
1083 ProcessSpec {
1084 command: "cmd".to_string(),
1085 args: vec![
1086 "/C".to_string(),
1087 format!(
1088 "if exist \"{marker_path}\" (timeout /T 60 /NOBREAK >NUL) else (type nul > \"{marker_path}\" & exit /B 1)"
1089 ),
1090 ],
1091 env: BTreeMap::new(),
1092 auto_restart: Some(AutoRestartPolicy {
1093 enabled: true,
1094 max_restarts,
1095 }),
1096 ..Default::default()
1097 }
1098 }
1099
1100 #[cfg(not(any(unix, windows)))]
1101 fn fail_once_then_run_spec(_marker_path: &str, max_restarts: u32) -> ProcessSpec {
1102 ProcessSpec {
1103 command: "unsupported".to_string(),
1104 args: vec![],
1105 env: BTreeMap::new(),
1106 auto_restart: Some(AutoRestartPolicy {
1107 enabled: true,
1108 max_restarts,
1109 }),
1110 ..Default::default()
1111 }
1112 }
1113
1114 #[test]
1115 fn version_is_set() {
1116 assert!(!version().is_empty());
1117 }
1118
1119 #[test]
1120 fn missing_state_defaults_to_stopped() {
1121 let (_tmp, manager) = manager();
1122 let status = manager.status("github").unwrap();
1123 assert_eq!(status, ServerStatus::Stopped);
1124 }
1125
1126 #[test]
1127 fn start_transitions_to_running() {
1128 let (_tmp, manager) = manager();
1129 let spec = long_running_spec();
1130 let outcome = manager.start("github", &spec).unwrap();
1131 assert_eq!(outcome, StartOutcome::Started);
1132 assert_eq!(manager.status("github").unwrap(), ServerStatus::Running);
1133 let _ = manager.stop("github");
1134 }
1135
1136 #[test]
1137 fn starting_running_server_reports_already_running() {
1138 let (_tmp, manager) = manager();
1139 let spec = long_running_spec();
1140 manager.start("github", &spec).unwrap();
1141 let outcome = manager.start("github", &spec).unwrap();
1142 assert_eq!(outcome, StartOutcome::AlreadyRunning);
1143 let _ = manager.stop("github");
1144 }
1145
1146 #[test]
1147 fn stop_transitions_to_stopped() {
1148 let (_tmp, manager) = manager();
1149 let spec = long_running_spec();
1150 manager.start("github", &spec).unwrap();
1151 let outcome = manager.stop("github").unwrap();
1152 assert_eq!(outcome, StopOutcome::Stopped);
1153 assert_eq!(manager.status("github").unwrap(), ServerStatus::Stopped);
1154 }
1155
1156 #[cfg(unix)]
1157 #[test]
1158 fn stop_escalates_when_process_ignores_term() {
1159 let (_tmp, manager) = manager();
1160 let spec = ignores_term_spec();
1161 manager.start("github", &spec).unwrap();
1162 let outcome = manager.stop("github").unwrap();
1163 assert_eq!(outcome, StopOutcome::Stopped);
1164 assert_eq!(manager.status("github").unwrap(), ServerStatus::Stopped);
1165 }
1166
1167 #[test]
1168 fn stopping_stopped_server_reports_already_stopped() {
1169 let (_tmp, manager) = manager();
1170 let outcome = manager.stop("github").unwrap();
1171 assert_eq!(outcome, StopOutcome::AlreadyStopped);
1172 }
1173
1174 #[test]
1175 fn restart_ends_in_running_state() {
1176 let (_tmp, manager) = manager();
1177 let spec = long_running_spec();
1178 manager.restart("github", &spec).unwrap();
1179 assert_eq!(manager.status("github").unwrap(), ServerStatus::Running);
1180 let _ = manager.stop("github");
1181 }
1182
1183 #[test]
1184 fn tail_logs_returns_last_lines() {
1185 let (_tmp, manager) = manager();
1186 let spec = long_running_spec();
1187 manager.start("github", &spec).unwrap();
1188 manager.stop("github").unwrap();
1189 manager.start("github", &spec).unwrap();
1190 let _ = manager.stop("github");
1191
1192 let lines = manager.tail_logs("github", 2).unwrap();
1193 assert_eq!(lines.len(), 2);
1194 assert!(lines.iter().any(|l| l.contains("STOP")));
1195 assert!(lines.iter().any(|l| l.contains("START")));
1196 }
1197
1198 #[test]
1199 fn start_stop_writes_audit_events() {
1200 let (_tmp, manager) = manager();
1201 let spec = long_running_spec();
1202 manager.start("github", &spec).unwrap();
1203 manager.stop("github").unwrap();
1204
1205 let audit_path = manager.audit_log_path();
1206 let content = fs::read_to_string(audit_path).unwrap();
1207 let lines: Vec<&str> = content.lines().collect();
1208 assert!(lines.iter().any(|l| l.contains("\"action\":\"start\"")));
1209 assert!(lines.iter().any(|l| l.contains("\"action\":\"stop\"")));
1210 }
1211
1212 #[test]
1213 fn malformed_state_file_returns_error() {
1214 let (tmp, manager) = manager();
1215 let runtime_dir = tmp.path().join(".berth/runtime");
1216 fs::create_dir_all(&runtime_dir).unwrap();
1217 fs::write(runtime_dir.join("github.toml"), "not = [valid").unwrap();
1218
1219 let err = manager.status("github").unwrap_err();
1220 assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1221 }
1222
1223 #[test]
1224 fn status_with_spec_auto_restarts_crashed_process() {
1225 let (_tmp, manager) = manager();
1226 let crash = crash_spec_with_policy(1);
1227 let recover = long_running_spec();
1228 manager.start("github", &crash).unwrap();
1229 wait_until_process_exits(&manager, "github");
1230
1231 let status = manager.status_with_spec("github", Some(&recover)).unwrap();
1232 assert_eq!(status, ServerStatus::Running);
1233
1234 let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1235 assert!(audit.contains("\"action\":\"auto-restart\""));
1236 let _ = manager.stop("github");
1237 }
1238
1239 #[test]
1240 fn status_with_spec_does_not_restart_when_server_denied_by_policy() {
1241 let (tmp, manager) = manager();
1242 let crash = crash_spec_with_policy(1);
1243 let recover = long_running_spec();
1244 manager.start("github", &crash).unwrap();
1245 wait_until_process_exits(&manager, "github");
1246 write_policy_deny_server(&tmp, "github");
1247
1248 let status = manager.status_with_spec("github", Some(&recover)).unwrap();
1249 assert_eq!(status, ServerStatus::Stopped);
1250
1251 let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1252 assert!(audit.contains("\"action\":\"policy-denied\""));
1253 assert!(!audit.contains("\"action\":\"auto-restart\""));
1254 }
1255
1256 #[test]
1257 fn auto_restart_respects_max_restarts_bound() {
1258 let (_tmp, manager) = manager();
1259 let crash = crash_spec_with_policy(1);
1260 manager.start("github", &crash).unwrap();
1261 wait_until_process_exits(&manager, "github");
1262
1263 let _ = manager.status_with_spec("github", Some(&crash)).unwrap();
1264 wait_until_process_exits(&manager, "github");
1265 let second = manager.status_with_spec("github", Some(&crash)).unwrap();
1266 assert_eq!(second, ServerStatus::Stopped);
1267
1268 let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1269 let count = audit
1270 .lines()
1271 .filter(|l| l.contains("\"action\":\"auto-restart\""))
1272 .count();
1273 assert_eq!(count, 1);
1274 }
1275
1276 #[cfg(any(unix, windows))]
1277 #[test]
1278 fn tokio_supervisor_recovers_crash_without_status_polling() {
1279 let (tmp, manager) = manager();
1280 let marker = tmp.path().join(".berth/runtime/github.restart-flag");
1281 fs::create_dir_all(marker.parent().unwrap()).unwrap();
1282 let marker_path = marker.to_string_lossy().to_string();
1283
1284 let supervisor_spec = fail_once_then_run_spec(&marker_path, 1);
1285 let mut start_spec = supervisor_spec.clone();
1286 start_spec.auto_restart = None;
1287 manager.start("github", &start_spec).unwrap();
1288
1289 let supervisor_manager = RuntimeManager::new(tmp.path().join(".berth"));
1290 let thread_spec = supervisor_spec.clone();
1291 let handle =
1292 thread::spawn(move || supervisor_manager.run_supervisor("github", &thread_spec));
1293
1294 let mut recovered = false;
1295 for _ in 0..200 {
1296 let state = manager.read_state("github").unwrap();
1297 if state.restart_attempts == 1 && state.pid.is_some_and(process_is_alive) {
1298 recovered = true;
1299 break;
1300 }
1301 thread::sleep(Duration::from_millis(20));
1302 }
1303 assert!(recovered);
1304
1305 let _ = manager.stop("github");
1306 handle.join().unwrap().unwrap();
1307 }
1308
1309 #[cfg(any(unix, windows))]
1310 #[test]
1311 fn tokio_supervisor_does_not_restart_when_server_denied_by_policy() {
1312 let (tmp, manager) = manager();
1313 let mut start_spec = crash_spec_with_policy(1);
1314 start_spec.auto_restart = None;
1315 manager.start("github", &start_spec).unwrap();
1316 wait_until_process_exits(&manager, "github");
1317 write_policy_deny_server(&tmp, "github");
1318
1319 let supervisor_spec = crash_spec_with_policy(1);
1320 manager.run_supervisor("github", &supervisor_spec).unwrap();
1321
1322 let state = manager.read_state("github").unwrap();
1323 assert_eq!(state.status, ServerStatus::Stopped);
1324 assert_eq!(state.pid, None);
1325
1326 let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1327 assert!(audit.contains("\"action\":\"policy-denied\""));
1328 assert!(!audit.contains("\"action\":\"auto-restart\""));
1329 }
1330}