astro_runner/
command.rs

1use astro_run::{Error, Result, RunResult, StreamSender};
2use serde::{Deserialize, Serialize};
3use std::{
4  path::{Path, PathBuf},
5  process::Stdio,
6};
7use tokio::{
8  io::{AsyncBufReadExt, BufReader},
9  process::Command as Cmd,
10};
11
12/// A command to be executed by the runner.
13#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Command {
15  pub command: String,
16  pub current_dir: Option<PathBuf>,
17  pub envs: Vec<(String, String)>,
18}
19
20impl Command {
21  pub fn new(cmd: impl Into<String>) -> Self {
22    Self {
23      command: cmd.into(),
24      current_dir: None,
25      envs: vec![],
26    }
27  }
28
29  pub fn env(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
30    self.envs.push((key.into(), value.into()));
31
32    self
33  }
34
35  pub fn dir(&mut self, dir: &Path) -> &mut Self {
36    self.current_dir = Some(dir.to_path_buf());
37
38    self
39  }
40
41  pub async fn exec(&mut self) -> Result<String> {
42    let mut command = self.build_command();
43    let output = command.output().await.map_err(|err| {
44      Error::internal_runtime_error(format!("Failed to spawn child process: {}", err))
45    })?;
46
47    if output.status.success() {
48      let stdout = String::from_utf8(output.stdout)
49        .map_err(|err| Error::internal_runtime_error(format!("Failed to parse stdout: {}", err)))?;
50      return Ok(stdout.trim().to_string());
51    }
52
53    let stderr = String::from_utf8(output.stderr)
54      .map_err(|err| Error::internal_runtime_error(format!("Failed to parse stderr: {}", err)))?;
55
56    Err(Error::internal_runtime_error(stderr))
57  }
58
59  pub async fn run(&mut self, sender: StreamSender) -> Result<()> {
60    let mut command = self.build_command();
61    let mut child = command
62      .stdout(Stdio::piped())
63      .stderr(Stdio::piped())
64      .spawn()
65      .map_err(|err| {
66        Error::internal_runtime_error(format!("Failed to spawn child process: {}", err))
67      })?;
68
69    let out = child.stdout.take().ok_or(Error::internal_runtime_error(
70      "Failed to get stdout from child process".to_string(),
71    ))?;
72    let err = child.stderr.take().ok_or(Error::internal_runtime_error(
73      "Failed to get stderr from child process".to_string(),
74    ))?;
75
76    let out = BufReader::new(out);
77    let err = BufReader::new(err);
78
79    let mut lines = out.lines();
80    let mut errors = err.lines();
81
82    loop {
83      tokio::select! {
84        line = lines.next_line() => {
85          match line {
86            Ok(Some(line)) => {
87              sender.log(line);
88            }
89            Ok(None) => {
90                break;
91            }
92            Err(err) => {
93              sender.error(err.to_string());
94              break;
95            }
96          }
97        }
98        error = errors.next_line() => {
99          match error {
100            Ok(Some(error)) => {
101              sender.error(error);
102            }
103            Ok(None) => {
104              break;
105            }
106            Err(err) => {
107              sender.error(err.to_string());
108              break;
109            }
110          }
111        }
112      }
113    }
114
115    let status = child.wait().await.map_err(|err| {
116      Error::internal_runtime_error(format!("Failed to wait for child process: {}", err))
117    })?;
118
119    let res = status
120      .code()
121      .map(|code| {
122        if code == 0 {
123          RunResult::Succeeded
124        } else {
125          RunResult::Failed { exit_code: code }
126        }
127      })
128      .unwrap_or_else(|| RunResult::Failed { exit_code: 1 });
129
130    sender.end(res);
131
132    Ok(())
133  }
134
135  fn build_command(&self) -> Cmd {
136    let mut command;
137
138    #[cfg(target_os = "windows")]
139    {
140      command = Cmd::new("powershell.exe");
141
142      command
143        .arg("-NoProfile")
144        .arg("-NonInteractive")
145        .arg("-Command")
146        .arg(self.command.clone());
147    }
148    #[cfg(not(target_os = "windows"))]
149    {
150      command = Cmd::new("sh");
151
152      command.arg("-c").arg(self.command.clone());
153    }
154
155    if let Some(dir) = &self.current_dir {
156      command.current_dir(dir);
157    }
158
159    for (key, value) in &self.envs {
160      command.env(key, value);
161    }
162
163    command
164  }
165}
166
167#[cfg(test)]
168mod tests {
169  use super::*;
170  use astro_run::stream;
171  use tokio_stream::StreamExt;
172
173  #[tokio::test]
174  async fn test_command() {
175    let mut cmd = Command::new("echo hello");
176    let (sender, mut receiver) = stream();
177
178    let mut logs = vec![];
179
180    tokio::join!(
181      async {
182        while let Some(log) = receiver.next().await {
183          logs.push(log);
184        }
185      },
186      async {
187        cmd.run(sender).await.unwrap();
188      }
189    );
190
191    assert_eq!(logs.len(), 1);
192    assert_eq!(logs[0].message, "hello");
193
194    assert_eq!(receiver.result().unwrap(), RunResult::Succeeded);
195  }
196
197  #[tokio::test]
198  async fn test_command_with_env() {
199    let command = if cfg!(target_os = "windows") {
200      "echo $env:HELLO"
201    } else {
202      "echo ${HELLO}"
203    };
204    let mut cmd = Command::new(command);
205    cmd.env("HELLO", "world");
206    let (sender, mut receiver) = stream();
207
208    let mut logs = vec![];
209
210    tokio::join!(
211      async {
212        while let Some(log) = receiver.next().await {
213          logs.push(log);
214        }
215      },
216      async {
217        cmd.run(sender).await.unwrap();
218      }
219    );
220
221    assert_eq!(logs.len(), 1);
222    assert_eq!(logs[0].message, "world");
223  }
224
225  #[tokio::test]
226  async fn test_exec_command() {
227    let mut cmd = Command::new("echo hello");
228    let stdout = cmd.exec().await.unwrap();
229
230    assert_eq!(stdout, "hello");
231  }
232
233  #[astro_run_test::test]
234  async fn test_stderr_command() {
235    let mut cmd = Command::new("cd /not/exist");
236
237    let (sender, mut receiver) = stream();
238    let mut logs = vec![];
239
240    tokio::join!(
241      async {
242        while let Some(log) = receiver.next().await {
243          logs.push(log);
244        }
245      },
246      async {
247        cmd.run(sender).await.unwrap();
248      }
249    );
250
251    assert_eq!(logs.len(), 1);
252    assert!(logs[0].is_error());
253    assert!(matches!(
254      receiver.result().unwrap(),
255      RunResult::Failed { .. }
256    ));
257  }
258
259  #[astro_run_test::test]
260  async fn test_exec_stderr_command() {
261    let mut cmd = Command::new("cd /not/exist");
262
263    let res = cmd.exec().await;
264
265    assert!(res.is_err());
266  }
267}