Skip to main content

ferridriver_script/
session_procs.rs

1//! Process execution for the `commands` capability.
2//!
3//! One-shot: [`run_oneshot`] spawns, bounds wall-clock and output, kills
4//! the whole process group on timeout, and shapes stdout per the
5//! declared [`CommandOutput`] mode.
6//!
7//! Persistent: [`SessionProcs`] keeps long-running children (a dev
8//! server, a watcher) alive across VM rebuilds. It lives in the durable
9//! session tier, so `Drop` (idle-TTL reap / explicit close / shutdown)
10//! SIGKILLs every process group — a session can never leak a server.
11//!
12//! Every child is its own process group (`setsid` in `pre_exec`) so a
13//! shell pipeline dies whole, not just its leader. The environment is
14//! scrubbed to `PATH` plus the spec's declared passthrough names — a
15//! command never inherits ambient server secrets.
16
17use std::collections::HashMap;
18use std::process::Stdio;
19use std::sync::{Arc, Mutex};
20use std::time::{Duration, Instant};
21
22use tokio::io::AsyncReadExt;
23use tokio::process::Command;
24
25use crate::command_spec::{CommandOutput, ResolvedCommand, ResolvedExec};
26
27/// Default hard wall-clock bound for a one-shot command that did not
28/// declare `timeoutMs`. Without this a hung child (blocked on a
29/// resource, an infinite loop) would pin the calling script forever —
30/// the per-script interrupt-handler timeout does not fire during this
31/// native await. A spec's explicit `timeoutMs` still overrides.
32const DEFAULT_ONESHOT_TIMEOUT_MS: u64 = 120_000;
33
34/// Max bytes captured per stream (one-shot result, or the tail kept for
35/// a persistent process's `status`).
36const OUTPUT_CAP: usize = 8 * 1024 * 1024;
37const RING_CAP: usize = 64 * 1024;
38/// Max concurrently-running persistent processes per session.
39const MAX_PERSISTENT: usize = 16;
40
41fn configure(cmd: &mut Command, rc: &ResolvedCommand) {
42  cmd.env_clear();
43  if let Some(path) = std::env::var_os("PATH") {
44    cmd.env("PATH", path);
45  }
46  for name in &rc.env {
47    if let Some(val) = std::env::var_os(name) {
48      cmd.env(name, val);
49    }
50  }
51  if let Some(dir) = &rc.cwd {
52    cmd.current_dir(dir);
53  }
54  cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped());
55  // New session => child is its own process-group leader (pgid == pid),
56  // so `kill(-pid)` reaps the whole pipeline. SAFETY: `setsid` is
57  // async-signal-safe and the only call in the pre_exec hook.
58  #[allow(unsafe_code)]
59  unsafe {
60    cmd.pre_exec(|| {
61      libc::setsid();
62      Ok(())
63    });
64  }
65}
66
67fn build(rc: &ResolvedCommand) -> Command {
68  let mut cmd = match &rc.exec {
69    ResolvedExec::Shell(line) => {
70      let mut c = Command::new("sh");
71      c.arg("-c").arg(line);
72      c
73    },
74    ResolvedExec::Argv(argv) => {
75      // Argv is non-empty (deserialization enforces it); be defensive.
76      let mut c = Command::new(argv.first().map_or("true", String::as_str));
77      c.args(argv.iter().skip(1));
78      c
79    },
80  };
81  configure(&mut cmd, rc);
82  cmd
83}
84
85fn pid_of(id: Option<u32>) -> i32 {
86  id.and_then(|p| i32::try_from(p).ok()).unwrap_or(0)
87}
88
89/// SIGKILL the process group led by `pid` (best-effort).
90fn kill_group(pid: i32) {
91  if pid > 0 {
92    #[allow(unsafe_code)]
93    unsafe {
94      libc::kill(-pid, libc::SIGKILL);
95    }
96  }
97}
98
99/// Read up to `cap` bytes; `Err` if the stream exceeds it (the process
100/// group is killed by the caller).
101async fn read_capped<R: tokio::io::AsyncRead + Unpin>(mut r: R, cap: usize) -> Result<Vec<u8>, String> {
102  let mut buf = Vec::new();
103  let mut chunk = [0u8; 8192];
104  loop {
105    let n = r
106      .read(&mut chunk)
107      .await
108      .map_err(|e| format!("read child output: {e}"))?;
109    if n == 0 {
110      break;
111    }
112    if buf.len() + n > cap {
113      return Err(format!("command output exceeded {cap} bytes"));
114    }
115    buf.extend_from_slice(&chunk[..n]);
116  }
117  Ok(buf)
118}
119
120fn shape(stdout: &[u8], mode: CommandOutput) -> Result<serde_json::Value, String> {
121  let s = String::from_utf8_lossy(stdout);
122  let t = s.trim();
123  match mode {
124    CommandOutput::Text => Ok(if t.is_empty() {
125      serde_json::Value::Null
126    } else {
127      serde_json::Value::String(t.to_string())
128    }),
129    CommandOutput::Json => {
130      if t.is_empty() {
131        return Ok(serde_json::Value::Null);
132      }
133      serde_json::from_str(t).map_err(|e| format!("command output is not valid JSON: {e}"))
134    },
135    CommandOutput::Lines => Ok(serde_json::Value::Array(
136      t.lines()
137        .map(str::trim)
138        .filter(|l| !l.is_empty())
139        .map(|l| serde_json::Value::String(l.to_string()))
140        .collect(),
141    )),
142  }
143}
144
145/// Run a one-shot command to completion. Errors on non-zero exit
146/// (message carries stderr), timeout, or output past the cap.
147pub async fn run_oneshot(rc: &ResolvedCommand) -> Result<serde_json::Value, String> {
148  if rc.persistent {
149    return Err("this command is declared `persistent`: use commands.start/status/stop, not run".to_string());
150  }
151  let mut child = build(rc).spawn().map_err(|e| format!("spawn command: {e}"))?;
152  let pid = pid_of(child.id());
153  let out = child.stdout.take().ok_or("no stdout pipe")?;
154  let err = child.stderr.take().ok_or("no stderr pipe")?;
155
156  let work = Box::pin(async move {
157    let (o, e) = tokio::join!(read_capped(out, OUTPUT_CAP), read_capped(err, OUTPUT_CAP));
158    let status = child.wait().await.map_err(|e| format!("wait child: {e}"))?;
159    Ok::<_, String>((o?, e?, status))
160  });
161
162  // An explicit `timeoutMs` is honoured as-is; an unset one still gets
163  // a hard default so a hung one-shot can never block the session
164  // indefinitely.
165  let ms = rc.timeout_ms.unwrap_or(DEFAULT_ONESHOT_TIMEOUT_MS);
166  let (stdout, stderr, status) = {
167    let Ok(r) = tokio::time::timeout(Duration::from_millis(ms), work).await else {
168      kill_group(pid);
169      return Err(format!("command timed out after {ms}ms"));
170    };
171    r.inspect_err(|_| kill_group(pid))?
172  };
173
174  if !status.success() {
175    let code = status.code().map_or_else(|| "signal".to_string(), |c| c.to_string());
176    let msg = String::from_utf8_lossy(&stderr);
177    return Err(format!("command failed (exit {code}): {}", msg.trim()));
178  }
179  shape(&stdout, rc.output)
180}
181
182/// A bounded tail of a stream — only the last [`RING_CAP`] bytes.
183#[derive(Default)]
184struct Ring(Vec<u8>);
185impl Ring {
186  fn push(&mut self, b: &[u8]) {
187    self.0.extend_from_slice(b);
188    if self.0.len() > RING_CAP {
189      let cut = self.0.len() - RING_CAP;
190      self.0.drain(..cut);
191    }
192  }
193  fn text(&self) -> String {
194    String::from_utf8_lossy(&self.0).into_owned()
195  }
196}
197
198struct Proc {
199  pid: i32,
200  started: Instant,
201  stdout: Arc<Mutex<Ring>>,
202  stderr: Arc<Mutex<Ring>>,
203  /// Set by the reaper task once the child exits.
204  exit: Arc<Mutex<Option<i32>>>,
205}
206
207/// Per-session persistent-process registry. Owned by the durable
208/// session tier; `Drop` kills every process group.
209pub struct SessionProcs {
210  inner: Mutex<HashMap<String, Proc>>,
211}
212
213impl Default for SessionProcs {
214  fn default() -> Self {
215    Self {
216      inner: Mutex::new(HashMap::new()),
217    }
218  }
219}
220
221impl SessionProcs {
222  /// Start (or no-op if already running) a persistent command. Returns
223  /// the pid.
224  pub fn start(&self, name: &str, rc: &ResolvedCommand) -> Result<i32, String> {
225    if !rc.persistent {
226      return Err("this command is not declared `persistent`: use commands.run".to_string());
227    }
228    let mut map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
229    if let Some(p) = map.get(name)
230      && p
231        .exit
232        .lock()
233        .unwrap_or_else(std::sync::PoisonError::into_inner)
234        .is_none()
235    {
236      return Ok(p.pid); // already running — idempotent
237    }
238    map.retain(|_, p| {
239      let alive = p
240        .exit
241        .lock()
242        .unwrap_or_else(std::sync::PoisonError::into_inner)
243        .is_none();
244      if !alive {
245        kill_group(p.pid);
246      }
247      alive
248    });
249    if map.len() >= MAX_PERSISTENT {
250      return Err(format!(
251        "too many persistent processes (max {MAX_PERSISTENT}) for this session"
252      ));
253    }
254
255    let mut child = build(rc).spawn().map_err(|e| format!("spawn command: {e}"))?;
256    let pid = pid_of(child.id());
257    let stdout = Arc::new(Mutex::new(Ring::default()));
258    let stderr = Arc::new(Mutex::new(Ring::default()));
259    let exit = Arc::new(Mutex::new(None));
260
261    if let Some(o) = child.stdout.take() {
262      pump(o, stdout.clone());
263    }
264    if let Some(e) = child.stderr.take() {
265      pump(e, stderr.clone());
266    }
267    let exit_w = exit.clone();
268    tokio::spawn(async move {
269      let code = child.wait().await.ok().and_then(|s| s.code()).unwrap_or(-1);
270      *exit_w.lock().unwrap_or_else(std::sync::PoisonError::into_inner) = Some(code);
271    });
272
273    map.insert(
274      name.to_string(),
275      Proc {
276        pid,
277        started: Instant::now(),
278        stdout,
279        stderr,
280        exit,
281      },
282    );
283    Ok(pid)
284  }
285
286  pub fn status(&self, name: &str) -> Result<serde_json::Value, String> {
287    let map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
288    let p = map
289      .get(name)
290      .ok_or_else(|| format!("no persistent process `{name}` started in this session"))?;
291    let exit = *p.exit.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
292    Ok(serde_json::json!({
293      "name": name,
294      "pid": p.pid,
295      "running": exit.is_none(),
296      "exitCode": exit,
297      "uptimeMs": p.started.elapsed().as_millis() as u64,
298      "stdout": p.stdout.lock().unwrap_or_else(std::sync::PoisonError::into_inner).text(),
299      "stderr": p.stderr.lock().unwrap_or_else(std::sync::PoisonError::into_inner).text(),
300    }))
301  }
302
303  pub fn stop(&self, name: &str) -> Result<(), String> {
304    let mut map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
305    match map.remove(name) {
306      Some(p) => {
307        kill_group(p.pid);
308        Ok(())
309      },
310      None => Err(format!("no persistent process `{name}` to stop")),
311    }
312  }
313}
314
315impl Drop for SessionProcs {
316  fn drop(&mut self) {
317    if let Ok(map) = self.inner.lock() {
318      for p in map.values() {
319        kill_group(p.pid);
320      }
321    }
322  }
323}
324
325fn pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(mut r: R, ring: Arc<Mutex<Ring>>) {
326  tokio::spawn(async move {
327    let mut chunk = [0u8; 8192];
328    loop {
329      match r.read(&mut chunk).await {
330        Ok(0) | Err(_) => break,
331        Ok(n) => ring
332          .lock()
333          .unwrap_or_else(std::sync::PoisonError::into_inner)
334          .push(&chunk[..n]),
335      }
336    }
337  });
338}