ferridriver_script/
session_procs.rs1use std::collections::HashMap;
18use std::process::Stdio;
19use std::sync::{Arc, Mutex};
20use std::time::{Duration, Instant};
21
22use tokio::io::AsyncReadExt;
23use tokio::process::Command;
24
25use crate::command_spec::{CommandOutput, ResolvedCommand, ResolvedExec};
26
27const DEFAULT_ONESHOT_TIMEOUT_MS: u64 = 120_000;
33
34const OUTPUT_CAP: usize = 8 * 1024 * 1024;
37const RING_CAP: usize = 64 * 1024;
38const MAX_PERSISTENT: usize = 16;
40
41fn configure(cmd: &mut Command, rc: &ResolvedCommand) {
42 cmd.env_clear();
43 if let Some(path) = std::env::var_os("PATH") {
44 cmd.env("PATH", path);
45 }
46 for name in &rc.env {
47 if let Some(val) = std::env::var_os(name) {
48 cmd.env(name, val);
49 }
50 }
51 if let Some(dir) = &rc.cwd {
52 cmd.current_dir(dir);
53 }
54 cmd.stdin(Stdio::null()).stdout(Stdio::piped()).stderr(Stdio::piped());
55 #[allow(unsafe_code)]
59 unsafe {
60 cmd.pre_exec(|| {
61 libc::setsid();
62 Ok(())
63 });
64 }
65}
66
67fn build(rc: &ResolvedCommand) -> Command {
68 let mut cmd = match &rc.exec {
69 ResolvedExec::Shell(line) => {
70 let mut c = Command::new("sh");
71 c.arg("-c").arg(line);
72 c
73 },
74 ResolvedExec::Argv(argv) => {
75 let mut c = Command::new(argv.first().map_or("true", String::as_str));
77 c.args(argv.iter().skip(1));
78 c
79 },
80 };
81 configure(&mut cmd, rc);
82 cmd
83}
84
85fn pid_of(id: Option<u32>) -> i32 {
86 id.and_then(|p| i32::try_from(p).ok()).unwrap_or(0)
87}
88
89fn kill_group(pid: i32) {
91 if pid > 0 {
92 #[allow(unsafe_code)]
93 unsafe {
94 libc::kill(-pid, libc::SIGKILL);
95 }
96 }
97}
98
99async fn read_capped<R: tokio::io::AsyncRead + Unpin>(mut r: R, cap: usize) -> Result<Vec<u8>, String> {
102 let mut buf = Vec::new();
103 let mut chunk = [0u8; 8192];
104 loop {
105 let n = r
106 .read(&mut chunk)
107 .await
108 .map_err(|e| format!("read child output: {e}"))?;
109 if n == 0 {
110 break;
111 }
112 if buf.len() + n > cap {
113 return Err(format!("command output exceeded {cap} bytes"));
114 }
115 buf.extend_from_slice(&chunk[..n]);
116 }
117 Ok(buf)
118}
119
120fn shape(stdout: &[u8], mode: CommandOutput) -> Result<serde_json::Value, String> {
121 let s = String::from_utf8_lossy(stdout);
122 let t = s.trim();
123 match mode {
124 CommandOutput::Text => Ok(if t.is_empty() {
125 serde_json::Value::Null
126 } else {
127 serde_json::Value::String(t.to_string())
128 }),
129 CommandOutput::Json => {
130 if t.is_empty() {
131 return Ok(serde_json::Value::Null);
132 }
133 serde_json::from_str(t).map_err(|e| format!("command output is not valid JSON: {e}"))
134 },
135 CommandOutput::Lines => Ok(serde_json::Value::Array(
136 t.lines()
137 .map(str::trim)
138 .filter(|l| !l.is_empty())
139 .map(|l| serde_json::Value::String(l.to_string()))
140 .collect(),
141 )),
142 }
143}
144
145pub async fn run_oneshot(rc: &ResolvedCommand) -> Result<serde_json::Value, String> {
148 if rc.persistent {
149 return Err("this command is declared `persistent`: use commands.start/status/stop, not run".to_string());
150 }
151 let mut child = build(rc).spawn().map_err(|e| format!("spawn command: {e}"))?;
152 let pid = pid_of(child.id());
153 let out = child.stdout.take().ok_or("no stdout pipe")?;
154 let err = child.stderr.take().ok_or("no stderr pipe")?;
155
156 let work = Box::pin(async move {
157 let (o, e) = tokio::join!(read_capped(out, OUTPUT_CAP), read_capped(err, OUTPUT_CAP));
158 let status = child.wait().await.map_err(|e| format!("wait child: {e}"))?;
159 Ok::<_, String>((o?, e?, status))
160 });
161
162 let ms = rc.timeout_ms.unwrap_or(DEFAULT_ONESHOT_TIMEOUT_MS);
166 let (stdout, stderr, status) = {
167 let Ok(r) = tokio::time::timeout(Duration::from_millis(ms), work).await else {
168 kill_group(pid);
169 return Err(format!("command timed out after {ms}ms"));
170 };
171 r.inspect_err(|_| kill_group(pid))?
172 };
173
174 if !status.success() {
175 let code = status.code().map_or_else(|| "signal".to_string(), |c| c.to_string());
176 let msg = String::from_utf8_lossy(&stderr);
177 return Err(format!("command failed (exit {code}): {}", msg.trim()));
178 }
179 shape(&stdout, rc.output)
180}
181
182#[derive(Default)]
184struct Ring(Vec<u8>);
185impl Ring {
186 fn push(&mut self, b: &[u8]) {
187 self.0.extend_from_slice(b);
188 if self.0.len() > RING_CAP {
189 let cut = self.0.len() - RING_CAP;
190 self.0.drain(..cut);
191 }
192 }
193 fn text(&self) -> String {
194 String::from_utf8_lossy(&self.0).into_owned()
195 }
196}
197
198struct Proc {
199 pid: i32,
200 started: Instant,
201 stdout: Arc<Mutex<Ring>>,
202 stderr: Arc<Mutex<Ring>>,
203 exit: Arc<Mutex<Option<i32>>>,
205}
206
207pub struct SessionProcs {
210 inner: Mutex<HashMap<String, Proc>>,
211}
212
213impl Default for SessionProcs {
214 fn default() -> Self {
215 Self {
216 inner: Mutex::new(HashMap::new()),
217 }
218 }
219}
220
221impl SessionProcs {
222 pub fn start(&self, name: &str, rc: &ResolvedCommand) -> Result<i32, String> {
225 if !rc.persistent {
226 return Err("this command is not declared `persistent`: use commands.run".to_string());
227 }
228 let mut map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
229 if let Some(p) = map.get(name)
230 && p
231 .exit
232 .lock()
233 .unwrap_or_else(std::sync::PoisonError::into_inner)
234 .is_none()
235 {
236 return Ok(p.pid); }
238 map.retain(|_, p| {
239 let alive = p
240 .exit
241 .lock()
242 .unwrap_or_else(std::sync::PoisonError::into_inner)
243 .is_none();
244 if !alive {
245 kill_group(p.pid);
246 }
247 alive
248 });
249 if map.len() >= MAX_PERSISTENT {
250 return Err(format!(
251 "too many persistent processes (max {MAX_PERSISTENT}) for this session"
252 ));
253 }
254
255 let mut child = build(rc).spawn().map_err(|e| format!("spawn command: {e}"))?;
256 let pid = pid_of(child.id());
257 let stdout = Arc::new(Mutex::new(Ring::default()));
258 let stderr = Arc::new(Mutex::new(Ring::default()));
259 let exit = Arc::new(Mutex::new(None));
260
261 if let Some(o) = child.stdout.take() {
262 pump(o, stdout.clone());
263 }
264 if let Some(e) = child.stderr.take() {
265 pump(e, stderr.clone());
266 }
267 let exit_w = exit.clone();
268 tokio::spawn(async move {
269 let code = child.wait().await.ok().and_then(|s| s.code()).unwrap_or(-1);
270 *exit_w.lock().unwrap_or_else(std::sync::PoisonError::into_inner) = Some(code);
271 });
272
273 map.insert(
274 name.to_string(),
275 Proc {
276 pid,
277 started: Instant::now(),
278 stdout,
279 stderr,
280 exit,
281 },
282 );
283 Ok(pid)
284 }
285
286 pub fn status(&self, name: &str) -> Result<serde_json::Value, String> {
287 let map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
288 let p = map
289 .get(name)
290 .ok_or_else(|| format!("no persistent process `{name}` started in this session"))?;
291 let exit = *p.exit.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
292 Ok(serde_json::json!({
293 "name": name,
294 "pid": p.pid,
295 "running": exit.is_none(),
296 "exitCode": exit,
297 "uptimeMs": p.started.elapsed().as_millis() as u64,
298 "stdout": p.stdout.lock().unwrap_or_else(std::sync::PoisonError::into_inner).text(),
299 "stderr": p.stderr.lock().unwrap_or_else(std::sync::PoisonError::into_inner).text(),
300 }))
301 }
302
303 pub fn stop(&self, name: &str) -> Result<(), String> {
304 let mut map = self.inner.lock().unwrap_or_else(std::sync::PoisonError::into_inner);
305 match map.remove(name) {
306 Some(p) => {
307 kill_group(p.pid);
308 Ok(())
309 },
310 None => Err(format!("no persistent process `{name}` to stop")),
311 }
312 }
313}
314
315impl Drop for SessionProcs {
316 fn drop(&mut self) {
317 if let Ok(map) = self.inner.lock() {
318 for p in map.values() {
319 kill_group(p.pid);
320 }
321 }
322 }
323}
324
325fn pump<R: tokio::io::AsyncRead + Unpin + Send + 'static>(mut r: R, ring: Arc<Mutex<Ring>>) {
326 tokio::spawn(async move {
327 let mut chunk = [0u8; 8192];
328 loop {
329 match r.read(&mut chunk).await {
330 Ok(0) | Err(_) => break,
331 Ok(n) => ring
332 .lock()
333 .unwrap_or_else(std::sync::PoisonError::into_inner)
334 .push(&chunk[..n]),
335 }
336 }
337 });
338}