xscript/
tokio.rs

1//! Run commands asynchronously using [Tokio][tokio].
2//!
3//! This module adds a Tokio-based implementation of [`RunAsync`] to [`LocalEnv`] and
4//! [`ParentEnv`].
5
6use std::ffi::OsString;
7
8use tokio::io::{self, AsyncWriteExt};
9use tokio::process::Command;
10
11use crate::{
12    Cmd, In, LocalEnv, ParentEnv, RunAsync, RunError, RunErrorKind, RunOutput, RunResult, Vars,
13};
14
15impl RunAsync<OsString> for LocalEnv {
16    async fn run(&self, cmd: Cmd) -> RunResult<RunOutput, OsString> {
17        let mut command = Command::new(&*self.resolve_prog(cmd.prog()));
18        command.args(cmd.args());
19        if let Some(cwd) = cmd.cwd() {
20            command.current_dir(self.resolve_path(cwd));
21        } else {
22            command.current_dir(&self.0.cwd);
23        }
24        // Populate the environment variables.
25        if self.vars().is_clean() || cmd.vars().map(|vars| vars.is_clean()).unwrap_or(false) {
26            command.env_clear();
27        }
28        update_vars(&mut command, self.vars());
29        if let Some(vars) = cmd.vars() {
30            update_vars(&mut command, vars);
31        }
32        // Configure IO.
33        command.stdin(cmd.stdin().unwrap_or_else(|| self.default_stdin()).stdio());
34        command.stdout(
35            cmd.stdout()
36                .unwrap_or_else(|| self.default_stdout())
37                .stdio(),
38        );
39        command.stderr(
40            cmd.stderr()
41                .unwrap_or_else(|| self.default_stderr())
42                .stdio(),
43        );
44        // Make sure to kill and (eventually) reap the process when the future is aborted.
45        command.kill_on_drop(true);
46        let cmd = &cmd;
47        RunError::catch_async(cmd, async move {
48            let mut child = command.spawn()?;
49            let capture_stdout = child.stdout.is_some();
50            let capture_stderr = child.stderr.is_some();
51
52            let stdin = child.stdin.take();
53            let write_stdin_fut = async {
54                if let Some(mut stdin) = stdin {
55                    if let Some(In::Bytes(bytes)) = cmd.stdin() {
56                        stdin.write_all(bytes).await?;
57                    }
58                    stdin.flush().await?;
59                    drop(stdin);
60                }
61                Result::<(), io::Error>::Ok(())
62            };
63
64            let (write_result, read_result) =
65                tokio::join!(write_stdin_fut, child.wait_with_output());
66            let child_output = read_result?;
67            write_result?;
68
69            if self.0.replay_stdout {
70                io::stdout().write_all(&child_output.stdout).await.ok();
71            }
72            if self.0.replay_stderr {
73                io::stderr().write_all(&child_output.stderr).await.ok();
74            }
75            let output = RunOutput {
76                code: child_output.status.code(),
77                stdout: if capture_stdout {
78                    Some(child_output.stdout)
79                } else {
80                    None
81                },
82                stderr: if capture_stderr {
83                    Some(child_output.stderr)
84                } else {
85                    None
86                },
87            };
88            if child_output.status.success() || cmd.may_fail() {
89                Ok(output)
90            } else {
91                Err(RunErrorKind::Failed(output))
92            }
93        })
94        .await
95    }
96}
97
98fn update_vars(command: &mut Command, vars: &Vars) {
99    for (name, value) in vars.values() {
100        if let Some(value) = value {
101            command.env(name, value);
102        } else {
103            command.env_remove(name);
104        }
105    }
106}
107
108impl RunAsync<OsString> for ParentEnv {
109    async fn run(&self, cmd: Cmd<OsString>) -> Result<RunOutput, RunError<OsString>> {
110        // TODO: This is inefficient, we should factor out the actual launch code.
111        let env = RunError::catch(&cmd, || LocalEnv::current_dir().map_err(RunErrorKind::from))?;
112        RunAsync::run(&env, cmd).await
113    }
114}