Skip to main content

berth_runtime/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2026 Schwimmbeck Dominik
3
4//! Synchronous runtime state manager for installed MCP servers.
5
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::fmt;
9use std::fs::{self, OpenOptions};
10use std::io::{self, Write};
11use std::path::PathBuf;
12use std::process::{Command, Stdio};
13use std::thread;
14use std::time::{Duration, SystemTime, UNIX_EPOCH};
15
16/// Returns crate version for runtime diagnostics/tests.
17pub fn version() -> &'static str {
18    env!("CARGO_PKG_VERSION")
19}
20
21/// Runtime status persisted for a server.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
23#[serde(rename_all = "lowercase")]
24pub enum ServerStatus {
25    Running,
26    Stopped,
27}
28
29impl fmt::Display for ServerStatus {
30    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
31        match self {
32            ServerStatus::Running => write!(f, "running"),
33            ServerStatus::Stopped => write!(f, "stopped"),
34        }
35    }
36}
37
38/// Result of attempting to start a server.
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
40pub enum StartOutcome {
41    Started,
42    AlreadyRunning,
43}
44
45/// Result of attempting to stop a server.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum StopOutcome {
48    Stopped,
49    AlreadyStopped,
50}
51
52/// Runtime process specification for launching a server.
53#[derive(Debug, Clone, Default, Serialize, Deserialize)]
54pub struct ProcessSpec {
55    pub command: String,
56    pub args: Vec<String>,
57    pub env: BTreeMap<String, String>,
58    pub auto_restart: Option<AutoRestartPolicy>,
59    #[serde(default)]
60    pub max_memory_bytes: Option<u64>,
61    #[serde(default)]
62    pub max_file_descriptors: Option<u64>,
63}
64
65/// Auto-restart policy applied to supervised server processes.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
67pub struct AutoRestartPolicy {
68    pub enabled: bool,
69    pub max_restarts: u32,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
73struct RuntimeState {
74    status: ServerStatus,
75    updated_at_epoch_secs: u64,
76    #[serde(default)]
77    pid: Option<u32>,
78    #[serde(default)]
79    command: Option<String>,
80    #[serde(default)]
81    args: Vec<String>,
82    #[serde(default)]
83    auto_restart_enabled: bool,
84    #[serde(default)]
85    max_restarts: u32,
86    #[serde(default)]
87    restart_attempts: u32,
88}
89
90#[derive(Debug, Default, Deserialize)]
91struct RuntimePolicyFile {
92    #[serde(default)]
93    servers: RuntimePolicyServers,
94}
95
96#[derive(Debug, Default, Deserialize)]
97struct RuntimePolicyServers {
98    #[serde(default)]
99    deny: Vec<String>,
100}
101
102#[derive(Debug, Serialize, Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct AuditEvent {
105    timestamp_epoch_secs: u64,
106    server: String,
107    action: String,
108    #[serde(skip_serializing_if = "Option::is_none")]
109    pid: Option<u32>,
110    #[serde(skip_serializing_if = "Option::is_none")]
111    command: Option<String>,
112    #[serde(skip_serializing_if = "Option::is_none")]
113    args: Option<Vec<String>>,
114}
115
116impl Default for RuntimeState {
117    fn default() -> Self {
118        RuntimeState {
119            status: ServerStatus::Stopped,
120            updated_at_epoch_secs: now_epoch_secs(),
121            pid: None,
122            command: None,
123            args: Vec::new(),
124            auto_restart_enabled: false,
125            max_restarts: 0,
126            restart_attempts: 0,
127        }
128    }
129}
130
131pub struct RuntimeManager {
132    berth_home: PathBuf,
133}
134
135impl RuntimeManager {
136    /// Creates a manager rooted at a Berth home directory.
137    pub fn new<P: Into<PathBuf>>(berth_home: P) -> Self {
138        RuntimeManager {
139            berth_home: berth_home.into(),
140        }
141    }
142
143    /// Returns current persisted status for a server.
144    pub fn status(&self, server: &str) -> io::Result<ServerStatus> {
145        self.status_with_spec(server, None)
146    }
147
148    /// Returns current persisted status for a server with optional restart spec.
149    pub fn status_with_spec(
150        &self,
151        server: &str,
152        spec: Option<&ProcessSpec>,
153    ) -> io::Result<ServerStatus> {
154        let mut state = self.read_state(server)?;
155
156        if state.status == ServerStatus::Running {
157            let old_pid = state.pid;
158            let old_command = state.command.clone();
159            let old_args = state.args.clone();
160
161            if let Some(pid) = state.pid {
162                if process_is_alive(pid) {
163                    return Ok(ServerStatus::Running);
164                }
165            }
166
167            let expects_external_supervisor = spec
168                .and_then(|s| s.auto_restart)
169                .is_some_and(|policy| policy.enabled)
170                && !state.auto_restart_enabled;
171            if expects_external_supervisor
172                && self.wait_for_supervisor_replacement(server, old_pid)?
173            {
174                return Ok(ServerStatus::Running);
175            }
176
177            // Record that a previously running process exited.
178            state.status = ServerStatus::Stopped;
179            state.pid = None;
180            state.updated_at_epoch_secs = now_epoch_secs();
181            self.write_state(server, &state)?;
182            self.append_log(server, "EXIT")?;
183            self.append_audit_event(AuditEvent {
184                timestamp_epoch_secs: now_epoch_secs(),
185                server: server.to_string(),
186                action: "exit".to_string(),
187                pid: old_pid,
188                command: old_command.clone(),
189                args: if old_args.is_empty() {
190                    None
191                } else {
192                    Some(old_args.clone())
193                },
194            })?;
195
196            // Attempt bounded auto-restart when policy is enabled.
197            if state.auto_restart_enabled && state.restart_attempts < state.max_restarts {
198                if let Some(spec) = spec {
199                    if self.server_denied_by_policy(server)? {
200                        state.status = ServerStatus::Stopped;
201                        state.pid = None;
202                        state.updated_at_epoch_secs = now_epoch_secs();
203                        self.write_state(server, &state)?;
204                        self.append_log(server, "POLICY_DENIED_AUTO_RESTART")?;
205                        self.append_audit_event(AuditEvent {
206                            timestamp_epoch_secs: now_epoch_secs(),
207                            server: server.to_string(),
208                            action: "policy-denied".to_string(),
209                            pid: old_pid,
210                            command: old_command,
211                            args: if old_args.is_empty() {
212                                None
213                            } else {
214                                Some(old_args)
215                            },
216                        })?;
217                        return Ok(ServerStatus::Stopped);
218                    }
219
220                    let mut cmd = Command::new(&spec.command);
221                    cmd.args(&spec.args)
222                        .envs(&spec.env)
223                        .stdin(Stdio::null())
224                        .stdout(Stdio::from(self.open_log_append(server)?))
225                        .stderr(Stdio::from(self.open_log_append(server)?));
226                    apply_resource_limits(
227                        &mut cmd,
228                        spec.max_memory_bytes,
229                        spec.max_file_descriptors,
230                    );
231                    let child = cmd.spawn().map_err(|e| {
232                        io::Error::new(e.kind(), format!("failed to spawn process: {e}"))
233                    })?;
234                    let pid = child.id();
235                    drop(child);
236
237                    state.status = ServerStatus::Running;
238                    state.pid = Some(pid);
239                    state.command = Some(spec.command.clone());
240                    state.args = spec.args.clone();
241                    state.restart_attempts += 1;
242                    state.updated_at_epoch_secs = now_epoch_secs();
243                    self.write_state(server, &state)?;
244                    self.append_log(
245                        server,
246                        &format!(
247                            "AUTO_RESTART pid={pid} attempt={}/{}",
248                            state.restart_attempts, state.max_restarts
249                        ),
250                    )?;
251                    self.append_audit_event(AuditEvent {
252                        timestamp_epoch_secs: now_epoch_secs(),
253                        server: server.to_string(),
254                        action: "auto-restart".to_string(),
255                        pid: Some(pid),
256                        command: Some(spec.command.clone()),
257                        args: if spec.args.is_empty() {
258                            None
259                        } else {
260                            Some(spec.args.clone())
261                        },
262                    })?;
263                    return Ok(ServerStatus::Running);
264                }
265            }
266        }
267
268        Ok(ServerStatus::Stopped)
269    }
270
271    /// Waits briefly for an external supervisor to replace a dead pid in state.
272    fn wait_for_supervisor_replacement(
273        &self,
274        server: &str,
275        old_pid: Option<u32>,
276    ) -> io::Result<bool> {
277        for _ in 0..10 {
278            thread::sleep(Duration::from_millis(50));
279            let state = self.read_state(server)?;
280            if state.status != ServerStatus::Running {
281                return Ok(false);
282            }
283            if let Some(pid) = state.pid {
284                if Some(pid) != old_pid && process_is_alive(pid) {
285                    return Ok(true);
286                }
287            }
288        }
289        Ok(false)
290    }
291
292    /// Starts a server subprocess and records runtime state.
293    pub fn start(&self, server: &str, spec: &ProcessSpec) -> io::Result<StartOutcome> {
294        if spec.command.trim().is_empty() {
295            return Err(io::Error::new(
296                io::ErrorKind::InvalidInput,
297                "process command must not be empty",
298            ));
299        }
300
301        let mut state = self.read_state(server)?;
302        if let Some(pid) = state.pid {
303            if process_is_alive(pid) {
304                state.status = ServerStatus::Running;
305                state.updated_at_epoch_secs = now_epoch_secs();
306                self.write_state(server, &state)?;
307                return Ok(StartOutcome::AlreadyRunning);
308            }
309
310            state.status = ServerStatus::Stopped;
311            state.pid = None;
312        }
313
314        fs::create_dir_all(self.logs_dir())?;
315        let log_file = self.open_log_append(server)?;
316        let err_file = log_file.try_clone()?;
317
318        let mut cmd = Command::new(&spec.command);
319        cmd.args(&spec.args)
320            .envs(&spec.env)
321            .stdin(Stdio::null())
322            .stdout(Stdio::from(log_file))
323            .stderr(Stdio::from(err_file));
324        apply_resource_limits(&mut cmd, spec.max_memory_bytes, spec.max_file_descriptors);
325        let child = cmd
326            .spawn()
327            .map_err(|e| io::Error::new(e.kind(), format!("failed to spawn process: {e}")))?;
328        let pid = child.id();
329        drop(child);
330
331        state.status = ServerStatus::Running;
332        state.pid = Some(pid);
333        state.command = Some(spec.command.clone());
334        state.args = spec.args.clone();
335        state.auto_restart_enabled = spec.auto_restart.map(|p| p.enabled).unwrap_or(false);
336        state.max_restarts = spec.auto_restart.map(|p| p.max_restarts).unwrap_or(0);
337        state.restart_attempts = 0;
338        state.updated_at_epoch_secs = now_epoch_secs();
339        self.write_state(server, &state)?;
340        self.append_log(server, &format!("START pid={pid}"))?;
341        self.append_audit_event(AuditEvent {
342            timestamp_epoch_secs: now_epoch_secs(),
343            server: server.to_string(),
344            action: "start".to_string(),
345            pid: Some(pid),
346            command: Some(spec.command.clone()),
347            args: if spec.args.is_empty() {
348                None
349            } else {
350                Some(spec.args.clone())
351            },
352        })?;
353        Ok(StartOutcome::Started)
354    }
355
356    /// Stops a running server subprocess and records runtime state.
357    pub fn stop(&self, server: &str) -> io::Result<StopOutcome> {
358        let mut state = self.read_state(server)?;
359        let old_pid = state.pid;
360        let old_command = state.command.clone();
361        let old_args = state.args.clone();
362        let mut outcome = StopOutcome::AlreadyStopped;
363        let pid_to_stop = state.pid.filter(|pid| process_is_alive(*pid));
364
365        if pid_to_stop.is_some() || state.status == ServerStatus::Running {
366            outcome = StopOutcome::Stopped;
367        }
368
369        // Mark stopped before signaling so a background supervisor can observe intent and exit.
370        state.status = ServerStatus::Stopped;
371        state.pid = None;
372        state.restart_attempts = 0;
373        state.updated_at_epoch_secs = now_epoch_secs();
374        self.write_state(server, &state)?;
375        self.append_log(server, "STOP")?;
376
377        if let Some(pid) = pid_to_stop {
378            terminate_process(pid)?;
379        }
380
381        // Close a narrow race where a supervisor could spawn a replacement pid concurrently.
382        for _ in 0..5 {
383            let latest = self.read_state(server)?;
384            let Some(pid) = latest.pid else {
385                break;
386            };
387            if !process_is_alive(pid) {
388                break;
389            }
390            terminate_process(pid)?;
391            let mut reset = latest;
392            reset.status = ServerStatus::Stopped;
393            reset.pid = None;
394            reset.restart_attempts = 0;
395            reset.updated_at_epoch_secs = now_epoch_secs();
396            self.write_state(server, &reset)?;
397            thread::sleep(Duration::from_millis(20));
398        }
399
400        if outcome == StopOutcome::Stopped {
401            self.append_audit_event(AuditEvent {
402                timestamp_epoch_secs: now_epoch_secs(),
403                server: server.to_string(),
404                action: "stop".to_string(),
405                pid: old_pid,
406                command: old_command,
407                args: if old_args.is_empty() {
408                    None
409                } else {
410                    Some(old_args)
411                },
412            })?;
413        }
414        Ok(outcome)
415    }
416
417    /// Restarts a server by stopping then starting with the same process spec.
418    pub fn restart(&self, server: &str, spec: &ProcessSpec) -> io::Result<()> {
419        let _ = self.stop(server)?;
420        let _ = self.start(server, spec)?;
421        let state = self.read_state(server)?;
422        self.append_audit_event(AuditEvent {
423            timestamp_epoch_secs: now_epoch_secs(),
424            server: server.to_string(),
425            action: "restart".to_string(),
426            pid: state.pid,
427            command: state.command,
428            args: if state.args.is_empty() {
429                None
430            } else {
431                Some(state.args)
432            },
433        })?;
434        Ok(())
435    }
436
437    /// Runs a tokio-backed supervision loop for one server until stopped.
438    pub fn run_supervisor(&self, server: &str, spec: &ProcessSpec) -> io::Result<()> {
439        let policy = match spec.auto_restart {
440            Some(policy) if policy.enabled => policy,
441            _ => return Ok(()),
442        };
443
444        let runtime = tokio::runtime::Builder::new_current_thread()
445            .enable_time()
446            .build()
447            .map_err(|e| io::Error::other(format!("failed to build tokio runtime: {e}")))?;
448
449        runtime.block_on(self.run_supervisor_loop(server, spec, policy))
450    }
451
452    /// Async supervision loop that monitors pid transitions and performs bounded restarts.
453    async fn run_supervisor_loop(
454        &self,
455        server: &str,
456        spec: &ProcessSpec,
457        policy: AutoRestartPolicy,
458    ) -> io::Result<()> {
459        let poll_interval = Duration::from_millis(100);
460        let mut restart_attempts = self.read_state(server)?.restart_attempts;
461
462        loop {
463            let state = self.read_state(server)?;
464            if state.status != ServerStatus::Running {
465                return Ok(());
466            }
467
468            let monitored_pid = match state.pid {
469                Some(pid) => pid,
470                None => {
471                    tokio::time::sleep(poll_interval).await;
472                    continue;
473                }
474            };
475
476            loop {
477                if !process_is_alive(monitored_pid) {
478                    break;
479                }
480                tokio::time::sleep(poll_interval).await;
481                let latest = self.read_state(server)?;
482                if latest.status != ServerStatus::Running {
483                    return Ok(());
484                }
485                if latest.pid != Some(monitored_pid) {
486                    // Another process took ownership; this supervisor exits.
487                    return Ok(());
488                }
489            }
490
491            let state_after_exit = self.read_state(server)?;
492            if state_after_exit.status != ServerStatus::Running {
493                return Ok(());
494            }
495            if state_after_exit.pid != Some(monitored_pid) {
496                return Ok(());
497            }
498
499            self.append_log(server, "EXIT")?;
500            self.append_audit_event(AuditEvent {
501                timestamp_epoch_secs: now_epoch_secs(),
502                server: server.to_string(),
503                action: "exit".to_string(),
504                pid: Some(monitored_pid),
505                command: state_after_exit.command.clone(),
506                args: if state_after_exit.args.is_empty() {
507                    None
508                } else {
509                    Some(state_after_exit.args.clone())
510                },
511            })?;
512
513            if restart_attempts >= policy.max_restarts {
514                let mut stopped_state = state_after_exit;
515                stopped_state.status = ServerStatus::Stopped;
516                stopped_state.pid = None;
517                stopped_state.updated_at_epoch_secs = now_epoch_secs();
518                stopped_state.restart_attempts = restart_attempts;
519                self.write_state(server, &stopped_state)?;
520                return Ok(());
521            }
522
523            if self.server_denied_by_policy(server)? {
524                let mut stopped_state = self.read_state(server)?;
525                stopped_state.status = ServerStatus::Stopped;
526                stopped_state.pid = None;
527                stopped_state.updated_at_epoch_secs = now_epoch_secs();
528                self.write_state(server, &stopped_state)?;
529                self.append_log(server, "POLICY_DENIED_AUTO_RESTART")?;
530                self.append_audit_event(AuditEvent {
531                    timestamp_epoch_secs: now_epoch_secs(),
532                    server: server.to_string(),
533                    action: "policy-denied".to_string(),
534                    pid: Some(monitored_pid),
535                    command: stopped_state.command.clone(),
536                    args: if stopped_state.args.is_empty() {
537                        None
538                    } else {
539                        Some(stopped_state.args.clone())
540                    },
541                })?;
542                return Ok(());
543            }
544
545            let control_state = self.read_state(server)?;
546            if control_state.status != ServerStatus::Running
547                || control_state.pid != Some(monitored_pid)
548            {
549                return Ok(());
550            }
551
552            let log_file = self.open_log_append(server)?;
553            let err_file = log_file.try_clone()?;
554            let mut cmd = Command::new(&spec.command);
555            cmd.args(&spec.args)
556                .envs(&spec.env)
557                .stdin(Stdio::null())
558                .stdout(Stdio::from(log_file))
559                .stderr(Stdio::from(err_file));
560            apply_resource_limits(&mut cmd, spec.max_memory_bytes, spec.max_file_descriptors);
561            let child = cmd
562                .spawn()
563                .map_err(|e| io::Error::new(e.kind(), format!("failed to spawn process: {e}")))?;
564            let pid = child.id();
565            drop(child);
566
567            // Stop could have raced with this spawn; terminate immediately if so.
568            if self.read_state(server)?.status != ServerStatus::Running {
569                let _ = terminate_process(pid);
570                return Ok(());
571            }
572
573            restart_attempts += 1;
574            let mut restarted_state = self.read_state(server)?;
575            restarted_state.status = ServerStatus::Running;
576            restarted_state.pid = Some(pid);
577            restarted_state.command = Some(spec.command.clone());
578            restarted_state.args = spec.args.clone();
579            restarted_state.updated_at_epoch_secs = now_epoch_secs();
580            restarted_state.restart_attempts = restart_attempts;
581            self.write_state(server, &restarted_state)?;
582            self.append_log(
583                server,
584                &format!(
585                    "AUTO_RESTART pid={pid} attempt={}/{}",
586                    restart_attempts, policy.max_restarts
587                ),
588            )?;
589            self.append_audit_event(AuditEvent {
590                timestamp_epoch_secs: now_epoch_secs(),
591                server: server.to_string(),
592                action: "auto-restart".to_string(),
593                pid: Some(pid),
594                command: Some(spec.command.clone()),
595                args: if spec.args.is_empty() {
596                    None
597                } else {
598                    Some(spec.args.clone())
599                },
600            })?;
601        }
602    }
603
604    /// Returns the last `lines` log lines for a server.
605    pub fn tail_logs(&self, server: &str, lines: usize) -> io::Result<Vec<String>> {
606        if lines == 0 {
607            return Ok(Vec::new());
608        }
609
610        let path = self.log_path(server);
611        if !path.exists() {
612            return Ok(Vec::new());
613        }
614
615        let content = fs::read_to_string(path)?;
616        let all: Vec<String> = content.lines().map(ToString::to_string).collect();
617        if all.len() <= lines {
618            return Ok(all);
619        }
620
621        Ok(all[all.len() - lines..].to_vec())
622    }
623
624    /// Appends a custom audit event for non-lifecycle runtime actions.
625    pub fn record_audit_event(
626        &self,
627        server: &str,
628        action: &str,
629        pid: Option<u32>,
630        command: Option<&str>,
631        args: Option<&[String]>,
632    ) -> io::Result<()> {
633        if action.trim().is_empty() {
634            return Err(io::Error::new(
635                io::ErrorKind::InvalidInput,
636                "audit action must not be empty",
637            ));
638        }
639
640        self.append_audit_event(AuditEvent {
641            timestamp_epoch_secs: now_epoch_secs(),
642            server: server.to_string(),
643            action: action.to_string(),
644            pid,
645            command: command.map(ToString::to_string),
646            args: args.filter(|v| !v.is_empty()).map(|v| v.to_vec()),
647        })
648    }
649
650    /// Runtime state directory path.
651    fn runtime_dir(&self) -> PathBuf {
652        self.berth_home.join("runtime")
653    }
654
655    /// Runtime log directory path.
656    fn logs_dir(&self) -> PathBuf {
657        self.berth_home.join("logs")
658    }
659
660    /// Audit log directory path.
661    fn audit_dir(&self) -> PathBuf {
662        self.berth_home.join("audit")
663    }
664
665    /// Per-server state file path.
666    fn state_path(&self, server: &str) -> PathBuf {
667        self.runtime_dir().join(format!("{server}.toml"))
668    }
669
670    /// Per-server log file path.
671    fn log_path(&self, server: &str) -> PathBuf {
672        self.logs_dir().join(format!("{server}.log"))
673    }
674
675    /// JSONL audit log file path.
676    fn audit_log_path(&self) -> PathBuf {
677        self.audit_dir().join("audit.jsonl")
678    }
679
680    /// Org policy file path.
681    fn policy_path(&self) -> PathBuf {
682        self.berth_home.join("policy.toml")
683    }
684
685    /// Returns true when org policy denies this server by name or wildcard.
686    fn server_denied_by_policy(&self, server: &str) -> io::Result<bool> {
687        let policy_path = self.policy_path();
688        if !policy_path.exists() {
689            return Ok(false);
690        }
691
692        let content = fs::read_to_string(&policy_path)?;
693        let policy: RuntimePolicyFile = toml::from_str(&content).map_err(|e| {
694            io::Error::new(
695                io::ErrorKind::InvalidData,
696                format!("failed to parse policy file {}: {e}", policy_path.display()),
697            )
698        })?;
699
700        Ok(policy.servers.deny.iter().any(|entry| {
701            let normalized = entry.trim();
702            normalized == "*" || normalized.eq_ignore_ascii_case(server)
703        }))
704    }
705
706    /// Reads persisted state, defaulting to stopped when missing.
707    fn read_state(&self, server: &str) -> io::Result<RuntimeState> {
708        let path = self.state_path(server);
709        if !path.exists() {
710            return Ok(RuntimeState::default());
711        }
712
713        let content = fs::read_to_string(path)?;
714        toml::from_str(&content).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
715    }
716
717    /// Persists a server runtime state as TOML.
718    fn write_state(&self, server: &str, state: &RuntimeState) -> io::Result<()> {
719        fs::create_dir_all(self.runtime_dir())?;
720        let serialized = toml::to_string_pretty(state)
721            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
722        fs::write(self.state_path(server), serialized)
723    }
724
725    /// Appends one lifecycle event line to a server log file.
726    fn append_log(&self, server: &str, event: &str) -> io::Result<()> {
727        let mut file = self.open_log_append(server)?;
728        writeln!(file, "[{}] {}", now_epoch_secs(), event)
729    }
730
731    /// Opens the server log file in append mode, creating it if needed.
732    fn open_log_append(&self, server: &str) -> io::Result<std::fs::File> {
733        fs::create_dir_all(self.logs_dir())?;
734        OpenOptions::new()
735            .create(true)
736            .append(true)
737            .open(self.log_path(server))
738    }
739
740    /// Appends one audit event as JSONL.
741    fn append_audit_event(&self, event: AuditEvent) -> io::Result<()> {
742        fs::create_dir_all(self.audit_dir())?;
743        let json = serde_json::to_string(&event)
744            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
745        let mut file = OpenOptions::new()
746            .create(true)
747            .append(true)
748            .open(self.audit_log_path())?;
749        writeln!(file, "{json}")
750    }
751}
752
753/// Returns current unix timestamp in seconds.
754fn now_epoch_secs() -> u64 {
755    SystemTime::now()
756        .duration_since(UNIX_EPOCH)
757        .unwrap_or_default()
758        .as_secs()
759}
760
761/// Applies resource limits (memory + file descriptors) to a command via `pre_exec`.
762#[cfg(unix)]
763fn apply_resource_limits(
764    cmd: &mut Command,
765    max_memory_bytes: Option<u64>,
766    max_file_descriptors: Option<u64>,
767) {
768    use std::os::unix::process::CommandExt;
769
770    if max_memory_bytes.is_none() && max_file_descriptors.is_none() {
771        return;
772    }
773
774    unsafe {
775        cmd.pre_exec(move || {
776            if let Some(bytes) = max_memory_bytes {
777                let rlim = libc::rlimit {
778                    rlim_cur: bytes,
779                    rlim_max: bytes,
780                };
781                if libc::setrlimit(libc::RLIMIT_AS, &rlim) != 0 {
782                    return Err(io::Error::last_os_error());
783                }
784            }
785            if let Some(n) = max_file_descriptors {
786                let rlim = libc::rlimit {
787                    rlim_cur: n,
788                    rlim_max: n,
789                };
790                if libc::setrlimit(libc::RLIMIT_NOFILE, &rlim) != 0 {
791                    return Err(io::Error::last_os_error());
792                }
793            }
794            Ok(())
795        });
796    }
797}
798
799/// No-op on non-Unix platforms.
800#[cfg(not(unix))]
801fn apply_resource_limits(
802    _cmd: &mut Command,
803    _max_memory_bytes: Option<u64>,
804    _max_file_descriptors: Option<u64>,
805) {
806}
807
808/// Returns whether a process is currently alive.
809#[cfg(unix)]
810fn process_is_alive(pid: u32) -> bool {
811    let pid_str = pid.to_string();
812    if let Ok(out) = Command::new("ps")
813        .args(["-o", "stat=", "-p", &pid_str])
814        .output()
815    {
816        if !out.status.success() {
817            return false;
818        }
819        let stat = String::from_utf8_lossy(&out.stdout).trim().to_string();
820        if stat.is_empty() {
821            return false;
822        }
823        // Zombie processes are dead for supervision purposes.
824        if stat.starts_with('Z') {
825            return false;
826        }
827        return true;
828    }
829
830    Command::new("kill")
831        .arg("-0")
832        .arg(&pid_str)
833        .status()
834        .is_ok_and(|s| s.success())
835}
836
837/// Returns whether a process is currently alive.
838#[cfg(windows)]
839fn process_is_alive(pid: u32) -> bool {
840    let output = Command::new("cmd")
841        .args(["/C", "tasklist", "/FI", &format!("PID eq {pid}")])
842        .output();
843    match output {
844        Ok(out) if out.status.success() => {
845            String::from_utf8_lossy(&out.stdout).contains(&pid.to_string())
846        }
847        _ => false,
848    }
849}
850
851/// Returns whether a process is currently alive.
852#[cfg(not(any(unix, windows)))]
853fn process_is_alive(_pid: u32) -> bool {
854    false
855}
856
857/// Sends a termination signal to a process.
858#[cfg(unix)]
859fn terminate_process(pid: u32) -> io::Result<()> {
860    let pid_str = pid.to_string();
861    let status = Command::new("kill").arg(&pid_str).status()?;
862    if !status.success() {
863        return Err(io::Error::new(
864            io::ErrorKind::PermissionDenied,
865            format!("failed to signal process {pid}"),
866        ));
867    }
868
869    if wait_for_process_exit(pid, 50, Duration::from_millis(20)) {
870        return Ok(());
871    }
872
873    // Escalate if the process does not exit after TERM.
874    let kill_status = Command::new("kill").args(["-9", &pid_str]).status()?;
875    if kill_status.success() {
876        Ok(())
877    } else {
878        Err(io::Error::new(
879            io::ErrorKind::PermissionDenied,
880            format!("failed to force terminate process {pid}"),
881        ))
882    }
883}
884
885/// Sends a termination signal to a process.
886#[cfg(windows)]
887fn terminate_process(pid: u32) -> io::Result<()> {
888    // Try graceful shutdown first, then force-kill if needed.
889    let status = Command::new("taskkill")
890        .args(["/PID", &pid.to_string(), "/T"])
891        .status()?;
892    if !status.success() {
893        return Err(io::Error::new(
894            io::ErrorKind::PermissionDenied,
895            format!("failed to terminate process {pid}"),
896        ));
897    }
898
899    if wait_for_process_exit(pid, 50, Duration::from_millis(20)) {
900        return Ok(());
901    }
902
903    let force_status = Command::new("taskkill")
904        .args(["/PID", &pid.to_string(), "/T", "/F"])
905        .status()?;
906    if force_status.success() {
907        Ok(())
908    } else {
909        Err(io::Error::new(
910            io::ErrorKind::PermissionDenied,
911            format!("failed to force terminate process {pid}"),
912        ))
913    }
914}
915
916/// Sends a termination signal to a process.
917#[cfg(not(any(unix, windows)))]
918fn terminate_process(_pid: u32) -> io::Result<()> {
919    Err(io::Error::new(
920        io::ErrorKind::Unsupported,
921        "process termination is not supported on this platform",
922    ))
923}
924
925/// Waits for a process to exit, checking liveness repeatedly.
926fn wait_for_process_exit(pid: u32, attempts: u32, interval: Duration) -> bool {
927    for _ in 0..attempts {
928        if !process_is_alive(pid) {
929            return true;
930        }
931        thread::sleep(interval);
932    }
933    !process_is_alive(pid)
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939    use std::thread;
940    use std::time::Duration;
941
942    fn manager() -> (tempfile::TempDir, RuntimeManager) {
943        let tmp = tempfile::tempdir().unwrap();
944        let manager = RuntimeManager::new(tmp.path().join(".berth"));
945        (tmp, manager)
946    }
947
948    fn wait_until_process_exits(manager: &RuntimeManager, server: &str) {
949        for _ in 0..100 {
950            let state = manager.read_state(server).unwrap();
951            match state.pid {
952                Some(pid) if process_is_alive(pid) => thread::sleep(Duration::from_millis(20)),
953                _ => return,
954            }
955        }
956    }
957
958    fn write_policy_deny_server(tmp: &tempfile::TempDir, server: &str) {
959        fs::create_dir_all(tmp.path().join(".berth")).unwrap();
960        fs::write(
961            tmp.path().join(".berth").join("policy.toml"),
962            format!("[servers]\ndeny = [\"{server}\"]\n"),
963        )
964        .unwrap();
965    }
966
967    #[cfg(unix)]
968    fn long_running_spec() -> ProcessSpec {
969        ProcessSpec {
970            command: "sh".to_string(),
971            args: vec!["-c".to_string(), "sleep 60".to_string()],
972            env: BTreeMap::new(),
973            auto_restart: None,
974            ..Default::default()
975        }
976    }
977
978    #[cfg(windows)]
979    fn long_running_spec() -> ProcessSpec {
980        ProcessSpec {
981            command: "cmd".to_string(),
982            args: vec![
983                "/C".to_string(),
984                "timeout".to_string(),
985                "/T".to_string(),
986                "60".to_string(),
987                "/NOBREAK".to_string(),
988            ],
989            env: BTreeMap::new(),
990            auto_restart: None,
991            ..Default::default()
992        }
993    }
994
995    #[cfg(not(any(unix, windows)))]
996    fn long_running_spec() -> ProcessSpec {
997        ProcessSpec {
998            command: "unsupported".to_string(),
999            args: vec![],
1000            env: BTreeMap::new(),
1001            auto_restart: None,
1002            ..Default::default()
1003        }
1004    }
1005
1006    #[cfg(unix)]
1007    fn ignores_term_spec() -> ProcessSpec {
1008        ProcessSpec {
1009            command: "sh".to_string(),
1010            args: vec![
1011                "-c".to_string(),
1012                "trap '' TERM; while true; do sleep 1; done".to_string(),
1013            ],
1014            env: BTreeMap::new(),
1015            auto_restart: None,
1016            ..Default::default()
1017        }
1018    }
1019
1020    #[cfg(unix)]
1021    fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1022        ProcessSpec {
1023            command: "sh".to_string(),
1024            args: vec!["-c".to_string(), "exit 1".to_string()],
1025            env: BTreeMap::new(),
1026            auto_restart: Some(AutoRestartPolicy {
1027                enabled: true,
1028                max_restarts,
1029            }),
1030            ..Default::default()
1031        }
1032    }
1033
1034    #[cfg(windows)]
1035    fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1036        ProcessSpec {
1037            command: "cmd".to_string(),
1038            args: vec!["/C".to_string(), "exit /B 1".to_string()],
1039            env: BTreeMap::new(),
1040            auto_restart: Some(AutoRestartPolicy {
1041                enabled: true,
1042                max_restarts,
1043            }),
1044            ..Default::default()
1045        }
1046    }
1047
1048    #[cfg(not(any(unix, windows)))]
1049    fn crash_spec_with_policy(max_restarts: u32) -> ProcessSpec {
1050        ProcessSpec {
1051            command: "unsupported".to_string(),
1052            args: vec![],
1053            env: BTreeMap::new(),
1054            auto_restart: Some(AutoRestartPolicy {
1055                enabled: true,
1056                max_restarts,
1057            }),
1058            ..Default::default()
1059        }
1060    }
1061
1062    #[cfg(unix)]
1063    fn fail_once_then_run_spec(marker_path: &str, max_restarts: u32) -> ProcessSpec {
1064        ProcessSpec {
1065            command: "sh".to_string(),
1066            args: vec![
1067                "-c".to_string(),
1068                format!(
1069                    "if [ -f '{marker_path}' ]; then sleep 60; else touch '{marker_path}'; exit 1; fi"
1070                ),
1071            ],
1072            env: BTreeMap::new(),
1073            auto_restart: Some(AutoRestartPolicy {
1074                enabled: true,
1075                max_restarts,
1076            }),
1077            ..Default::default()
1078        }
1079    }
1080
1081    #[cfg(windows)]
1082    fn fail_once_then_run_spec(marker_path: &str, max_restarts: u32) -> ProcessSpec {
1083        ProcessSpec {
1084            command: "cmd".to_string(),
1085            args: vec![
1086                "/C".to_string(),
1087                format!(
1088                    "if exist \"{marker_path}\" (timeout /T 60 /NOBREAK >NUL) else (type nul > \"{marker_path}\" & exit /B 1)"
1089                ),
1090            ],
1091            env: BTreeMap::new(),
1092            auto_restart: Some(AutoRestartPolicy {
1093                enabled: true,
1094                max_restarts,
1095            }),
1096            ..Default::default()
1097        }
1098    }
1099
1100    #[cfg(not(any(unix, windows)))]
1101    fn fail_once_then_run_spec(_marker_path: &str, max_restarts: u32) -> ProcessSpec {
1102        ProcessSpec {
1103            command: "unsupported".to_string(),
1104            args: vec![],
1105            env: BTreeMap::new(),
1106            auto_restart: Some(AutoRestartPolicy {
1107                enabled: true,
1108                max_restarts,
1109            }),
1110            ..Default::default()
1111        }
1112    }
1113
1114    #[test]
1115    fn version_is_set() {
1116        assert!(!version().is_empty());
1117    }
1118
1119    #[test]
1120    fn missing_state_defaults_to_stopped() {
1121        let (_tmp, manager) = manager();
1122        let status = manager.status("github").unwrap();
1123        assert_eq!(status, ServerStatus::Stopped);
1124    }
1125
1126    #[test]
1127    fn start_transitions_to_running() {
1128        let (_tmp, manager) = manager();
1129        let spec = long_running_spec();
1130        let outcome = manager.start("github", &spec).unwrap();
1131        assert_eq!(outcome, StartOutcome::Started);
1132        assert_eq!(manager.status("github").unwrap(), ServerStatus::Running);
1133        let _ = manager.stop("github");
1134    }
1135
1136    #[test]
1137    fn starting_running_server_reports_already_running() {
1138        let (_tmp, manager) = manager();
1139        let spec = long_running_spec();
1140        manager.start("github", &spec).unwrap();
1141        let outcome = manager.start("github", &spec).unwrap();
1142        assert_eq!(outcome, StartOutcome::AlreadyRunning);
1143        let _ = manager.stop("github");
1144    }
1145
1146    #[test]
1147    fn stop_transitions_to_stopped() {
1148        let (_tmp, manager) = manager();
1149        let spec = long_running_spec();
1150        manager.start("github", &spec).unwrap();
1151        let outcome = manager.stop("github").unwrap();
1152        assert_eq!(outcome, StopOutcome::Stopped);
1153        assert_eq!(manager.status("github").unwrap(), ServerStatus::Stopped);
1154    }
1155
1156    #[cfg(unix)]
1157    #[test]
1158    fn stop_escalates_when_process_ignores_term() {
1159        let (_tmp, manager) = manager();
1160        let spec = ignores_term_spec();
1161        manager.start("github", &spec).unwrap();
1162        let outcome = manager.stop("github").unwrap();
1163        assert_eq!(outcome, StopOutcome::Stopped);
1164        assert_eq!(manager.status("github").unwrap(), ServerStatus::Stopped);
1165    }
1166
1167    #[test]
1168    fn stopping_stopped_server_reports_already_stopped() {
1169        let (_tmp, manager) = manager();
1170        let outcome = manager.stop("github").unwrap();
1171        assert_eq!(outcome, StopOutcome::AlreadyStopped);
1172    }
1173
1174    #[test]
1175    fn restart_ends_in_running_state() {
1176        let (_tmp, manager) = manager();
1177        let spec = long_running_spec();
1178        manager.restart("github", &spec).unwrap();
1179        assert_eq!(manager.status("github").unwrap(), ServerStatus::Running);
1180        let _ = manager.stop("github");
1181    }
1182
1183    #[test]
1184    fn tail_logs_returns_last_lines() {
1185        let (_tmp, manager) = manager();
1186        let spec = long_running_spec();
1187        manager.start("github", &spec).unwrap();
1188        manager.stop("github").unwrap();
1189        manager.start("github", &spec).unwrap();
1190        let _ = manager.stop("github");
1191
1192        let lines = manager.tail_logs("github", 2).unwrap();
1193        assert_eq!(lines.len(), 2);
1194        assert!(lines.iter().any(|l| l.contains("STOP")));
1195        assert!(lines.iter().any(|l| l.contains("START")));
1196    }
1197
1198    #[test]
1199    fn start_stop_writes_audit_events() {
1200        let (_tmp, manager) = manager();
1201        let spec = long_running_spec();
1202        manager.start("github", &spec).unwrap();
1203        manager.stop("github").unwrap();
1204
1205        let audit_path = manager.audit_log_path();
1206        let content = fs::read_to_string(audit_path).unwrap();
1207        let lines: Vec<&str> = content.lines().collect();
1208        assert!(lines.iter().any(|l| l.contains("\"action\":\"start\"")));
1209        assert!(lines.iter().any(|l| l.contains("\"action\":\"stop\"")));
1210    }
1211
1212    #[test]
1213    fn malformed_state_file_returns_error() {
1214        let (tmp, manager) = manager();
1215        let runtime_dir = tmp.path().join(".berth/runtime");
1216        fs::create_dir_all(&runtime_dir).unwrap();
1217        fs::write(runtime_dir.join("github.toml"), "not = [valid").unwrap();
1218
1219        let err = manager.status("github").unwrap_err();
1220        assert_eq!(err.kind(), io::ErrorKind::InvalidData);
1221    }
1222
1223    #[test]
1224    fn status_with_spec_auto_restarts_crashed_process() {
1225        let (_tmp, manager) = manager();
1226        let crash = crash_spec_with_policy(1);
1227        let recover = long_running_spec();
1228        manager.start("github", &crash).unwrap();
1229        wait_until_process_exits(&manager, "github");
1230
1231        let status = manager.status_with_spec("github", Some(&recover)).unwrap();
1232        assert_eq!(status, ServerStatus::Running);
1233
1234        let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1235        assert!(audit.contains("\"action\":\"auto-restart\""));
1236        let _ = manager.stop("github");
1237    }
1238
1239    #[test]
1240    fn status_with_spec_does_not_restart_when_server_denied_by_policy() {
1241        let (tmp, manager) = manager();
1242        let crash = crash_spec_with_policy(1);
1243        let recover = long_running_spec();
1244        manager.start("github", &crash).unwrap();
1245        wait_until_process_exits(&manager, "github");
1246        write_policy_deny_server(&tmp, "github");
1247
1248        let status = manager.status_with_spec("github", Some(&recover)).unwrap();
1249        assert_eq!(status, ServerStatus::Stopped);
1250
1251        let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1252        assert!(audit.contains("\"action\":\"policy-denied\""));
1253        assert!(!audit.contains("\"action\":\"auto-restart\""));
1254    }
1255
1256    #[test]
1257    fn auto_restart_respects_max_restarts_bound() {
1258        let (_tmp, manager) = manager();
1259        let crash = crash_spec_with_policy(1);
1260        manager.start("github", &crash).unwrap();
1261        wait_until_process_exits(&manager, "github");
1262
1263        let _ = manager.status_with_spec("github", Some(&crash)).unwrap();
1264        wait_until_process_exits(&manager, "github");
1265        let second = manager.status_with_spec("github", Some(&crash)).unwrap();
1266        assert_eq!(second, ServerStatus::Stopped);
1267
1268        let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1269        let count = audit
1270            .lines()
1271            .filter(|l| l.contains("\"action\":\"auto-restart\""))
1272            .count();
1273        assert_eq!(count, 1);
1274    }
1275
1276    #[cfg(any(unix, windows))]
1277    #[test]
1278    fn tokio_supervisor_recovers_crash_without_status_polling() {
1279        let (tmp, manager) = manager();
1280        let marker = tmp.path().join(".berth/runtime/github.restart-flag");
1281        fs::create_dir_all(marker.parent().unwrap()).unwrap();
1282        let marker_path = marker.to_string_lossy().to_string();
1283
1284        let supervisor_spec = fail_once_then_run_spec(&marker_path, 1);
1285        let mut start_spec = supervisor_spec.clone();
1286        start_spec.auto_restart = None;
1287        manager.start("github", &start_spec).unwrap();
1288
1289        let supervisor_manager = RuntimeManager::new(tmp.path().join(".berth"));
1290        let thread_spec = supervisor_spec.clone();
1291        let handle =
1292            thread::spawn(move || supervisor_manager.run_supervisor("github", &thread_spec));
1293
1294        let mut recovered = false;
1295        for _ in 0..200 {
1296            let state = manager.read_state("github").unwrap();
1297            if state.restart_attempts == 1 && state.pid.is_some_and(process_is_alive) {
1298                recovered = true;
1299                break;
1300            }
1301            thread::sleep(Duration::from_millis(20));
1302        }
1303        assert!(recovered);
1304
1305        let _ = manager.stop("github");
1306        handle.join().unwrap().unwrap();
1307    }
1308
1309    #[cfg(any(unix, windows))]
1310    #[test]
1311    fn tokio_supervisor_does_not_restart_when_server_denied_by_policy() {
1312        let (tmp, manager) = manager();
1313        let mut start_spec = crash_spec_with_policy(1);
1314        start_spec.auto_restart = None;
1315        manager.start("github", &start_spec).unwrap();
1316        wait_until_process_exits(&manager, "github");
1317        write_policy_deny_server(&tmp, "github");
1318
1319        let supervisor_spec = crash_spec_with_policy(1);
1320        manager.run_supervisor("github", &supervisor_spec).unwrap();
1321
1322        let state = manager.read_state("github").unwrap();
1323        assert_eq!(state.status, ServerStatus::Stopped);
1324        assert_eq!(state.pid, None);
1325
1326        let audit = fs::read_to_string(manager.audit_log_path()).unwrap();
1327        assert!(audit.contains("\"action\":\"policy-denied\""));
1328        assert!(!audit.contains("\"action\":\"auto-restart\""));
1329    }
1330}