1use std::pin::pin;
2
3use anyhow::{anyhow, bail, Result};
4use camino::Utf8PathBuf;
5use futures::StreamExt as _;
6use remowt_endpoints::subprocess::{ProcId, SpawnSpec, StderrSpec, StdioSpec, SubprocessClient};
7use remowt_link_shared::BifConfig;
8use serde::de::DeserializeOwned;
9use tokio::io::AsyncWriteExt as _;
10use tokio::select;
11use tokio_util::codec::{BytesCodec, FramedRead, LinesCodec};
12use tracing::{debug, info, warn};
13
14use crate::forwarded::{RemowtListener, RemowtStream};
15use crate::{drain_to_tracing, Remowt};
16
17#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
18pub enum StdioMode {
19 #[default]
20 Null,
21 Pipe,
22 Inherit,
23}
24
25#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
26pub enum StderrMode {
27 #[default]
28 Null,
29 Pipe,
30 Inherit,
31 MergeWithStdout,
32}
33
34#[derive(Default)]
35pub struct SpawnOptions {
36 pub program: String,
37 pub args: Vec<String>,
38 pub env: Vec<(String, String)>,
39 pub env_clear: bool,
40 pub cwd: Option<Utf8PathBuf>,
41 pub escalated: bool,
42 pub stdin: StdioMode,
43 pub stdout: StdioMode,
44 pub stderr: StderrMode,
45}
46
47pub struct RemowtChild {
48 pub stdin: Option<RemowtStream>,
49 pub stdout: Option<RemowtStream>,
50 pub stderr: Option<RemowtStream>,
51 id: ProcId,
52 client: SubprocessClient<BifConfig>,
53}
54
55impl RemowtChild {
56 pub async fn wait(self) -> Result<Option<i32>> {
57 let RemowtChild {
58 stdin,
59 stdout,
60 stderr,
61 id,
62 client,
63 } = self;
64 drop(stdin);
65 let drain_out = async move {
66 if let Some(s) = stdout {
67 let _ = drain_to_tracing(s, "<child stdout>".to_owned(), false).await;
68 }
69 };
70 let drain_err = async move {
71 if let Some(s) = stderr {
72 let _ = drain_to_tracing(s, "<child stderr>".to_owned(), true).await;
73 }
74 };
75 let wait = async move {
76 client
77 .wait(id)
78 .await?
79 .map_err(|e| anyhow!("agent wait failed: {e}"))
80 };
81 let (code, _, _) = tokio::join!(wait, drain_out, drain_err);
82 code
83 }
84
85 pub async fn kill(&self, signal: i32) -> Result<()> {
86 self.client
87 .kill(self.id, signal)
88 .await?
89 .map_err(|e| anyhow!("agent kill failed: {e}"))
90 }
91}
92
93fn needs_socket(m: StdioMode) -> bool {
94 matches!(m, StdioMode::Pipe | StdioMode::Inherit)
95}
96
97fn stderr_needs_socket(m: StderrMode) -> bool {
98 matches!(m, StderrMode::Pipe | StderrMode::Inherit)
99}
100
101impl Remowt {
102 pub async fn spawn(&self, opts: SpawnOptions) -> Result<RemowtChild> {
103 let SpawnOptions {
104 program,
105 args,
106 env,
107 env_clear,
108 cwd,
109 escalated,
110 stdin,
111 stdout,
112 stderr,
113 } = opts;
114
115 if matches!(stderr, StderrMode::MergeWithStdout) && !needs_socket(stdout) {
116 bail!("stderr=MergeWithStdout requires stdout=Pipe or Inherit");
117 }
118
119 let stdin_bound = if needs_socket(stdin) {
120 Some(self.bind_runtime_unix("proc-stdin").await?)
121 } else {
122 None
123 };
124 let stdout_bound = if needs_socket(stdout) {
125 Some(self.bind_runtime_unix("proc-stdout").await?)
126 } else {
127 None
128 };
129 let stderr_bound = if stderr_needs_socket(stderr) {
130 Some(self.bind_runtime_unix("proc-stderr").await?)
131 } else {
132 None
133 };
134
135 let stdin_spec = match &stdin_bound {
136 Some((_, p)) => StdioSpec::Socket(p.clone()),
137 None => StdioSpec::Null,
138 };
139 let stdout_spec = match &stdout_bound {
140 Some((_, p)) => StdioSpec::Socket(p.clone()),
141 None => StdioSpec::Null,
142 };
143 let stderr_spec = match (&stderr, &stderr_bound) {
144 (StderrMode::Pipe | StderrMode::Inherit, Some((_, p))) => StderrSpec::Socket(p.clone()),
145 (StderrMode::MergeWithStdout, _) => StderrSpec::MergeWithStdout,
146 _ => StderrSpec::Null,
147 };
148
149 let client: SubprocessClient<BifConfig> = if escalated {
150 Box::pin(self.run0_endpoints::<SubprocessClient<BifConfig>>()).await?
152 } else {
153 self.endpoints()
154 };
155
156 let spec = SpawnSpec {
157 program: program.clone(),
158 args,
159 env,
160 env_clear,
161 cwd,
162 stdin: stdin_spec,
163 stdout: stdout_spec,
164 stderr: stderr_spec,
165 };
166 let id = client
167 .spawn(spec)
168 .await?
169 .map_err(|e| anyhow!("agent spawn failed: {e}"))?;
170
171 let (stdin_res, stdout_res, stderr_res) = tokio::join!(
172 accept(stdin_bound),
173 accept(stdout_bound),
174 accept(stderr_bound),
175 );
176
177 let stdin_stream = handle_stdin(stdin, stdin_res?, &program);
178 let stdout_stream = handle_output(stdout, stdout_res?, &program);
179 let stderr_stream = handle_output_err(stderr, stderr_res?, &program);
180
181 Ok(RemowtChild {
182 stdin: stdin_stream,
183 stdout: stdout_stream,
184 stderr: stderr_stream,
185 id,
186 client,
187 })
188 }
189
190 pub fn cmd(&self, program: impl AsRef<str>) -> RemowtCommand {
191 let program = program.as_ref().to_owned();
192 RemowtCommand {
193 program,
194 args: vec![],
195 env: vec![],
196 remowt: self.clone(),
197 escalated: false,
198 }
199 }
200}
201
202async fn accept(b: Option<(RemowtListener, Utf8PathBuf)>) -> Result<Option<RemowtStream>> {
203 match b {
204 Some((l, _)) => Ok(Some(l.accept().await?)),
205 None => Ok(None),
206 }
207}
208
209fn handle_stdin(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
210 match mode {
211 StdioMode::Pipe => s,
212 StdioMode::Inherit => {
213 if let Some(s) = s {
214 let program = program.to_owned();
215 tokio::spawn(async move {
216 let mut stdin = tokio::io::stdin();
217 let mut s = s;
218 if let Err(e) = tokio::io::copy(&mut stdin, &mut s).await {
219 warn!(program, "stdin forward ended: {e}");
220 }
221 let _ = s.shutdown().await;
222 });
223 }
224 None
225 }
226 StdioMode::Null => None,
227 }
228}
229
230fn handle_output(mode: StdioMode, s: Option<RemowtStream>, program: &str) -> Option<RemowtStream> {
231 match mode {
232 StdioMode::Pipe => s,
233 StdioMode::Inherit => {
234 if let Some(s) = s {
235 let program = program.to_owned();
236 tokio::spawn(drain_to_tracing(s, program, false));
237 }
238 None
239 }
240 StdioMode::Null => None,
241 }
242}
243
244fn handle_output_err(
245 mode: StderrMode,
246 s: Option<RemowtStream>,
247 program: &str,
248) -> Option<RemowtStream> {
249 match mode {
250 StderrMode::Pipe => s,
251 StderrMode::Inherit => {
252 if let Some(s) = s {
253 let program = program.to_owned();
254 tokio::spawn(drain_to_tracing(s, program, true));
255 }
256 None
257 }
258 StderrMode::MergeWithStdout | StderrMode::Null => None,
259 }
260}
261
262fn escape_bash(input: &str, out: &mut String) {
263 const TO_ESCAPE: &str = "$ !\"#&'()*,;<>?[\\]^`{|}";
264 if input.chars().all(|c| !TO_ESCAPE.contains(c)) {
265 out.push_str(input);
266 return;
267 }
268 out.push('\'');
269 for (i, v) in input.split('\'').enumerate() {
270 if i != 0 {
271 out.push_str("'\"'\"'");
272 }
273 out.push_str(v);
274 }
275 out.push('\'');
276}
277
278#[derive(Clone)]
279pub struct RemowtCommand {
280 program: String,
281 args: Vec<String>,
282 env: Vec<(String, String)>,
283 remowt: Remowt,
284 escalated: bool,
285}
286
287impl RemowtCommand {
288 pub fn arg(&mut self, arg: impl AsRef<str>) -> &mut Self {
289 self.args.push(arg.as_ref().to_owned());
290 self
291 }
292 pub fn args<V: AsRef<str>>(&mut self, args: impl IntoIterator<Item = V>) -> &mut Self {
293 for arg in args {
294 self.args.push(arg.as_ref().to_owned());
295 }
296 self
297 }
298 pub fn eqarg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
299 self.args
300 .push(format!("{}={}", key.as_ref(), value.as_ref()));
301 self
302 }
303 pub fn comparg(&mut self, key: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
304 self.args.push(key.as_ref().to_owned());
305 self.args.push(value.as_ref().to_owned());
306 self
307 }
308 pub fn env(&mut self, name: impl AsRef<str>, value: impl AsRef<str>) -> &mut Self {
309 self.env
310 .push((name.as_ref().to_owned(), value.as_ref().to_owned()));
311 self
312 }
313
314 pub fn sudo(mut self) -> Self {
315 self.escalated = true;
316 self
317 }
318
319 fn shell_line(&self) -> String {
321 let mut out = String::new();
322 if self.escalated {
323 out.push_str("run0 ");
324 }
325 if !self.env.is_empty() {
326 out.push_str("env");
327 for (k, v) in &self.env {
328 out.push(' ');
329 assert!(!k.contains('='));
330 escape_bash(k, &mut out);
331 out.push('=');
332 escape_bash(v, &mut out);
333 }
334 out.push(' ');
335 }
336 escape_bash(&self.program, &mut out);
337 for arg in &self.args {
338 out.push(' ');
339 escape_bash(arg, &mut out);
340 }
341 out
342 }
343
344 fn into_spawn_options(self) -> (Remowt, SpawnOptions, String) {
345 let line = self.shell_line();
346 let opts = SpawnOptions {
347 program: self.program,
348 args: self.args,
349 env: self.env,
350 env_clear: false,
351 cwd: None,
352 escalated: self.escalated,
353 stdin: StdioMode::Null,
354 stdout: StdioMode::Pipe,
355 stderr: StderrMode::Pipe,
356 };
357 (self.remowt, opts, line)
358 }
359
360 pub async fn run(self) -> Result<()> {
361 run_inner(self, false).await.map(|_| ())
362 }
363 pub async fn run_string(self) -> Result<String> {
364 let bytes = run_inner(self, true).await?.expect("want_stdout");
365 Ok(String::from_utf8(bytes)?)
366 }
367 pub async fn run_value<T: DeserializeOwned>(self) -> Result<T> {
368 let s = self.run_string().await?;
369 Ok(serde_json::from_str(&s)?)
370 }
371}
372
373async fn run_inner(cmd: RemowtCommand, want_stdout: bool) -> Result<Option<Vec<u8>>> {
374 let (remowt, opts, line) = cmd.into_spawn_options();
375 debug!("running command {line:?} over remowt");
376 let program = opts.program.clone();
377 let mut child = remowt.spawn(opts).await?;
378 let stderr = child.stderr.take().expect("stderr=Pipe");
379 let stdout = child.stdout.take().expect("stdout=Pipe");
380
381 let mut err = FramedRead::new(stderr, LinesCodec::new());
382 let (mut out_bytes, mut out_lines) = if want_stdout {
383 (Some(FramedRead::new(stdout, BytesCodec::new())), None)
384 } else {
385 (None, Some(FramedRead::new(stdout, LinesCodec::new())))
386 };
387
388 let mut buf = if want_stdout { Some(Vec::new()) } else { None };
389
390 let mut wait = pin!(child.wait());
391 let exit = loop {
392 select! {
393 biased;
394
395 Some(e) = err.next() => {
396 let e = e?;
397 warn!(program = %program, "{e}");
398 }
399 Some(o) = async { out_bytes.as_mut()?.next().await }, if want_stdout => {
400 buf.as_mut().expect("want_stdout").extend_from_slice(&o?);
401 }
402 Some(o) = async { out_lines.as_mut()?.next().await }, if !want_stdout => {
403 let o = o?;
404 info!(program = %program, "{o}");
405 }
406 res = &mut wait => {
407 break res?;
408 }
409 }
410 };
411
412 while let Some(e) = err.next().await {
413 if let Ok(line) = e {
414 warn!(program = %program, "{line}");
415 }
416 }
417 if want_stdout {
418 if let Some(out_bytes) = out_bytes.as_mut() {
419 while let Some(o) = out_bytes.next().await {
420 if let Ok(chunk) = o {
421 buf.as_mut().expect("want_stdout").extend_from_slice(&chunk);
422 }
423 }
424 }
425 } else if let Some(out_lines) = out_lines.as_mut() {
426 while let Some(o) = out_lines.next().await {
427 if let Ok(line) = o {
428 info!(program = %program, "{line}");
429 }
430 }
431 }
432
433 match exit {
434 Some(0) => Ok(buf),
435 Some(c) => bail!("command '{line}' failed with status {c}"),
436 None => Err(anyhow!("command '{line}' ended without an exit status")),
437 }
438}