Skip to main content

remowt_client/
subprocess.rs

1use std::pin::pin;
2
3use anyhow::{anyhow, bail, Result};
4use camino::Utf8PathBuf;
5use futures::StreamExt as _;
6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
7use remowt_link_shared::BifConfig;
8use serde::de::DeserializeOwned;
9use tokio::io::AsyncWriteExt as _;
10use tokio::select;
11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
12use tracing::{debug, info, warn};
13
14use crate::forwarded::{RemowtListener, RemowtStream};
15use crate::{drain_to_tracing, Remowt};
16
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum StdioMode {
19	#[default]
20	Null,
21	Pipe,
22	Inherit,
23}
24
25#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
26pub enum StderrMode {
27	#[default]
28	Null,
29	Pipe,
30	Inherit,
31	MergeWithStdout,
32}
33
34#[derive(Default)]
35pub struct SpawnOptions {
36	pub program: String,
37	pub args: Vec<String>,
38	pub env: Vec<(String, String)>,
39	pub env_clear: bool,
40	pub cwd: Option<Utf8PathBuf>,
41	pub escalated: bool,
42	pub stdin: StdioMode,
43	pub stdout: StdioMode,
44	pub stderr: StderrMode,
45}
46
47pub struct RemowtChild {
48	pub stdin: Option<RemowtStream>,
49	pub stdout: Option<RemowtStream>,
50	pub stderr: Option<RemowtStream>,
51	id: ProcId,
52	client: SubprocessClient<BifConfig>,
53}
54
55impl RemowtChild {
56	pub async fn wait(self) -> Result<Option<i32>> {
57		let RemowtChild {
58			stdin,
59			stdout,
60			stderr,
61			id,
62			client,
63		} = self;
64		drop(stdin);
65		let drain_out = async move {
66			if let Some(s) = stdout {
67				let _ = drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
68			}
69		};
70		let drain_err = async move {
71			if let Some(s) = stderr {
72				let _ = drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
73			}
74		};
75		let wait = async move {
76			client
77				.wait(id)
78				.await?
79				.map_err(|e| anyhow!("agent wait failed: {e}"))
80		};
81		let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
82		code
83	}
84
85	pub async fn kill(&self, signal: i32) -> Result<()> {
86		self.client
87			.kill(self.id, signal)
88			.await?
89			.map_err(|e| anyhow!("agent kill failed: {e}"))
90	}
91}
92
93fn needs_socket(m: StdioMode) -> bool {
94	matches!(m, StdioMode::Pipe | StdioMode::Inherit)
95}
96
97fn stderr_needs_socket(m: StderrMode) -> bool {
98	matches!(m, StderrMode::Pipe | StderrMode::Inherit)
99}
100
101impl Remowt {
102	pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {
103		let SpawnOptions {
104			program,
105			args,
106			env,
107			env_clear,
108			cwd,
109			escalated,
110			stdin,
111			stdout,
112			stderr,
113		} = opts;
114
115		if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {
116			bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");
117		}
118
119		let stdin_bound = if needs_socket(stdin) {
120			Some(self.bind_runtime_unix("proc-stdin").await?)
121		} else {
122			None
123		};
124		let stdout_bound = if needs_socket(stdout) {
125			Some(self.bind_runtime_unix("proc-stdout").await?)
126		} else {
127			None
128		};
129		let stderr_bound = if stderr_needs_socket(stderr) {
130			Some(self.bind_runtime_unix("proc-stderr").await?)
131		} else {
132			None
133		};
134
135		let stdin_spec = match &stdin_bound {
136			Some((_, p)) => StdioSpec::Socket(p.clone()),
137			None => StdioSpec::Null,
138		};
139		let stdout_spec = match &stdout_bound {
140			Some((_, p)) => StdioSpec::Socket(p.clone()),
141			None => StdioSpec::Null,
142		};
143		let stderr_spec = match (&stderr, &stderr_bound) {
144			(StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),
145			(StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,
146			_ => StderrSpec::Null,
147		};
148
149		let client: SubprocessClient<BifConfig> = if escalated {
150			// Boxed to break the async-fn type cycle
151			Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?
152		} else {
153			self.endpoints()
154		};
155
156		let spec = SpawnSpec {
157			program: program.clone(),
158			args,
159			env,
160			env_clear,
161			cwd,
162			stdin: stdin_spec,
163			stdout: stdout_spec,
164			stderr: stderr_spec,
165		};
166		let id = client
167			.spawn(spec)
168			.await?
169			.map_err(|e| anyhow!("agent spawn failed: {e}"))?;
170
171		let (stdin_res, stdout_res, stderr_res) = tokio::join!(
172			accept(stdin_bound),
173			accept(stdout_bound),
174			accept(stderr_bound),
175		);
176
177		let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
178		let stdout_stream = handle_output(stdout, stdout_res?, &program);
179		let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
180
181		Ok(RemowtChild {
182			stdin: stdin_stream,
183			stdout: stdout_stream,
184			stderr: stderr_stream,
185			id,
186			client,
187		})
188	}
189
190	pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {
191		let program = program.as_ref().to_owned();
192		RemowtCommand {
193			program,
194			args: vec![],
195			env: vec![],
196			remowt: self.clone(),
197			escalated: false,
198		}
199	}
200}
201
202async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {
203	match b {
204		Some((l, _)) => Ok(Some(l.accept().await?)),
205		None => Ok(None),
206	}
207}
208
209fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
210	match mode {
211		StdioMode::Pipe => s,
212		StdioMode::Inherit => {
213			if let Some(s) = s {
214				let program = program.to_owned();
215				tokio::spawn(async move {
216					let mut stdin = tokio::io::stdin();
217					let mut s = s;
218					if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {
219						warn!(program, "stdin forward ended: {e}");
220					}
221					let _ = s.shutdown().await;
222				});
223			}
224			None
225		}
226		StdioMode::Null => None,
227	}
228}
229
230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
231	match mode {
232		StdioMode::Pipe => s,
233		StdioMode::Inherit => {
234			if let Some(s) = s {
235				let program = program.to_owned();
236				tokio::spawn(drain_to_tracing(s, program, false));
237			}
238			None
239		}
240		StdioMode::Null => None,
241	}
242}
243
244fn handle_output_err(
245	mode: StderrMode,
246	s: Option<RemowtStream>,
247	program: &str,
248) -> Option<RemowtStream> {
249	match mode {
250		StderrMode::Pipe => s,
251		StderrMode::Inherit => {
252			if let Some(s) = s {
253				let program = program.to_owned();
254				tokio::spawn(drain_to_tracing(s, program, true));
255			}
256			None
257		}
258		StderrMode::MergeWithStdout | StderrMode::Null => None,
259	}
260}
261
262fn escape_bash(input: &str, out: &mut String) {
263	const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
264	if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
265		out.push_str(input);
266		return;
267	}
268	out.push('\'');
269	for (i, v) in input.split('\'').enumerate() {
270		if i != 0 {
271			out.push_str("'\"'\"'");
272		}
273		out.push_str(v);
274	}
275	out.push('\'');
276}
277
278#[derive(Clone)]
279pub struct RemowtCommand {
280	program: String,
281	args: Vec<String>,
282	env: Vec<(String, String)>,
283	remowt: Remowt,
284	escalated: bool,
285}
286
287impl RemowtCommand {
288	pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
289		self.args.push(arg.as_ref().to_owned());
290		self
291	}
292	pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
293		for arg in args {
294			self.args.push(arg.as_ref().to_owned());
295		}
296		self
297	}
298	pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
299		self.args
300			.push(format!("{}={}", key.as_ref(), value.as_ref()));
301		self
302	}
303	pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
304		self.args.push(key.as_ref().to_owned());
305		self.args.push(value.as_ref().to_owned());
306		self
307	}
308	pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
309		self.env
310			.push((name.as_ref().to_owned(), value.as_ref().to_owned()));
311		self
312	}
313
314	pub fn sudo(mut self) -> Self {
315		self.escalated = true;
316		self
317	}
318
319	/// Only for display.
320	fn shell_line(&self) -> String {
321		let mut out = String::new();
322		if self.escalated {
323			out.push_str("run0 ");
324		}
325		if !self.env.is_empty() {
326			out.push_str("env");
327			for (k, v) in &self.env {
328				out.push(' ');
329				assert!(!k.contains('='));
330				escape_bash(k, &mut out);
331				out.push('=');
332				escape_bash(v, &mut out);
333			}
334			out.push(' ');
335		}
336		escape_bash(&self.program, &mut out);
337		for arg in &self.args {
338			out.push(' ');
339			escape_bash(arg, &mut out);
340		}
341		out
342	}
343
344	fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {
345		let line = self.shell_line();
346		let opts = SpawnOptions {
347			program: self.program,
348			args: self.args,
349			env: self.env,
350			env_clear: false,
351			cwd: None,
352			escalated: self.escalated,
353			stdin: StdioMode::Null,
354			stdout: StdioMode::Pipe,
355			stderr: StderrMode::Pipe,
356		};
357		(self.remowt, opts, line)
358	}
359
360	pub async fn run(self) -> Result<()> {
361		run_inner(self, false).await.map(|_| ())
362	}
363	pub async fn run_string(self) -> Result<String> {
364		let bytes = run_inner(self, true).await?.expect("want_stdout");
365		Ok(String::from_utf8(bytes)?)
366	}
367	pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
368		let s = self.run_string().await?;
369		Ok(serde_json::from_str(&s)?)
370	}
371}
372
373async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {
374	let (remowt, opts, line) = cmd.into_spawn_options();
375	debug!("running command {line:?} over remowt");
376	let program = opts.program.clone();
377	let mut child = remowt.spawn(opts).await?;
378	let stderr = child.stderr.take().expect("stderr=Pipe");
379	let stdout = child.stdout.take().expect("stdout=Pipe");
380
381	let mut err = FramedRead::new(stderr, LinesCodec::new());
382	let (mut out_bytes, mut out_lines) = if want_stdout {
383		(Some(FramedRead::new(stdout, BytesCodec::new())), None)
384	} else {
385		(None, Some(FramedRead::new(stdout, LinesCodec::new())))
386	};
387
388	let mut buf = if want_stdout { Some(Vec::new()) } else { None };
389
390	let mut wait = pin!(child.wait());
391	let exit = loop {
392		select! {
393			biased;
394
395			Some(e) = err.next() => {
396				let e = e?;
397				warn!(program = %program, "{e}");
398			}
399			Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {
400				buf.as_mut().expect("want_stdout").extend_from_slice(&o?);
401			}
402			Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {
403				let o = o?;
404				info!(program = %program, "{o}");
405			}
406			res = &mut wait => {
407				break res?;
408			}
409		}
410	};
411
412	while let Some(e) = err.next().await {
413		if let Ok(line) = e {
414			warn!(program = %program, "{line}");
415		}
416	}
417	if want_stdout {
418		if let Some(out_bytes) = out_bytes.as_mut() {
419			while let Some(o) = out_bytes.next().await {
420				if let Ok(chunk) = o {
421					buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
422				}
423			}
424		}
425	} else if let Some(out_lines) = out_lines.as_mut() {
426		while let Some(o) = out_lines.next().await {
427			if let Ok(line) = o {
428				info!(program = %program, "{line}");
429			}
430		}
431	}
432
433	match exit {
434		Some(0) => Ok(buf),
435		Some(c) => bail!("command '{line}' failed with status {c}"),
436		None => Err(anyhow!("command '{line}' ended without an exit status")),
437	}
438}