1use astro_run::{Error, Result, RunResult, StreamSender};
2use serde::{Deserialize, Serialize};
3use std::{
4 path::{Path, PathBuf},
5 process::Stdio,
6};
7use tokio::{
8 io::{AsyncBufReadExt, BufReader},
9 process::Command as Cmd,
10};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
14pub struct Command {
15 pub command: String,
16 pub current_dir: Option<PathBuf>,
17 pub envs: Vec<(String, String)>,
18}
19
20impl Command {
21 pub fn new(cmd: impl Into<String>) -> Self {
22 Self {
23 command: cmd.into(),
24 current_dir: None,
25 envs: vec![],
26 }
27 }
28
29 pub fn env(&mut self, key: impl Into<String>, value: impl Into<String>) -> &mut Self {
30 self.envs.push((key.into(), value.into()));
31
32 self
33 }
34
35 pub fn dir(&mut self, dir: &Path) -> &mut Self {
36 self.current_dir = Some(dir.to_path_buf());
37
38 self
39 }
40
41 pub async fn exec(&mut self) -> Result<String> {
42 let mut command = self.build_command();
43 let output = command.output().await.map_err(|err| {
44 Error::internal_runtime_error(format!("Failed to spawn child process: {}", err))
45 })?;
46
47 if output.status.success() {
48 let stdout = String::from_utf8(output.stdout)
49 .map_err(|err| Error::internal_runtime_error(format!("Failed to parse stdout: {}", err)))?;
50 return Ok(stdout.trim().to_string());
51 }
52
53 let stderr = String::from_utf8(output.stderr)
54 .map_err(|err| Error::internal_runtime_error(format!("Failed to parse stderr: {}", err)))?;
55
56 Err(Error::internal_runtime_error(stderr))
57 }
58
59 pub async fn run(&mut self, sender: StreamSender) -> Result<()> {
60 let mut command = self.build_command();
61 let mut child = command
62 .stdout(Stdio::piped())
63 .stderr(Stdio::piped())
64 .spawn()
65 .map_err(|err| {
66 Error::internal_runtime_error(format!("Failed to spawn child process: {}", err))
67 })?;
68
69 let out = child.stdout.take().ok_or(Error::internal_runtime_error(
70 "Failed to get stdout from child process".to_string(),
71 ))?;
72 let err = child.stderr.take().ok_or(Error::internal_runtime_error(
73 "Failed to get stderr from child process".to_string(),
74 ))?;
75
76 let out = BufReader::new(out);
77 let err = BufReader::new(err);
78
79 let mut lines = out.lines();
80 let mut errors = err.lines();
81
82 loop {
83 tokio::select! {
84 line = lines.next_line() => {
85 match line {
86 Ok(Some(line)) => {
87 sender.log(line);
88 }
89 Ok(None) => {
90 break;
91 }
92 Err(err) => {
93 sender.error(err.to_string());
94 break;
95 }
96 }
97 }
98 error = errors.next_line() => {
99 match error {
100 Ok(Some(error)) => {
101 sender.error(error);
102 }
103 Ok(None) => {
104 break;
105 }
106 Err(err) => {
107 sender.error(err.to_string());
108 break;
109 }
110 }
111 }
112 }
113 }
114
115 let status = child.wait().await.map_err(|err| {
116 Error::internal_runtime_error(format!("Failed to wait for child process: {}", err))
117 })?;
118
119 let res = status
120 .code()
121 .map(|code| {
122 if code == 0 {
123 RunResult::Succeeded
124 } else {
125 RunResult::Failed { exit_code: code }
126 }
127 })
128 .unwrap_or_else(|| RunResult::Failed { exit_code: 1 });
129
130 sender.end(res);
131
132 Ok(())
133 }
134
135 fn build_command(&self) -> Cmd {
136 let mut command;
137
138 #[cfg(target_os = "windows")]
139 {
140 command = Cmd::new("powershell.exe");
141
142 command
143 .arg("-NoProfile")
144 .arg("-NonInteractive")
145 .arg("-Command")
146 .arg(self.command.clone());
147 }
148 #[cfg(not(target_os = "windows"))]
149 {
150 command = Cmd::new("sh");
151
152 command.arg("-c").arg(self.command.clone());
153 }
154
155 if let Some(dir) = &self.current_dir {
156 command.current_dir(dir);
157 }
158
159 for (key, value) in &self.envs {
160 command.env(key, value);
161 }
162
163 command
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use astro_run::stream;
171 use tokio_stream::StreamExt;
172
173 #[tokio::test]
174 async fn test_command() {
175 let mut cmd = Command::new("echo hello");
176 let (sender, mut receiver) = stream();
177
178 let mut logs = vec![];
179
180 tokio::join!(
181 async {
182 while let Some(log) = receiver.next().await {
183 logs.push(log);
184 }
185 },
186 async {
187 cmd.run(sender).await.unwrap();
188 }
189 );
190
191 assert_eq!(logs.len(), 1);
192 assert_eq!(logs[0].message, "hello");
193
194 assert_eq!(receiver.result().unwrap(), RunResult::Succeeded);
195 }
196
197 #[tokio::test]
198 async fn test_command_with_env() {
199 let command = if cfg!(target_os = "windows") {
200 "echo $env:HELLO"
201 } else {
202 "echo ${HELLO}"
203 };
204 let mut cmd = Command::new(command);
205 cmd.env("HELLO", "world");
206 let (sender, mut receiver) = stream();
207
208 let mut logs = vec![];
209
210 tokio::join!(
211 async {
212 while let Some(log) = receiver.next().await {
213 logs.push(log);
214 }
215 },
216 async {
217 cmd.run(sender).await.unwrap();
218 }
219 );
220
221 assert_eq!(logs.len(), 1);
222 assert_eq!(logs[0].message, "world");
223 }
224
225 #[tokio::test]
226 async fn test_exec_command() {
227 let mut cmd = Command::new("echo hello");
228 let stdout = cmd.exec().await.unwrap();
229
230 assert_eq!(stdout, "hello");
231 }
232
233 #[astro_run_test::test]
234 async fn test_stderr_command() {
235 let mut cmd = Command::new("cd /not/exist");
236
237 let (sender, mut receiver) = stream();
238 let mut logs = vec![];
239
240 tokio::join!(
241 async {
242 while let Some(log) = receiver.next().await {
243 logs.push(log);
244 }
245 },
246 async {
247 cmd.run(sender).await.unwrap();
248 }
249 );
250
251 assert_eq!(logs.len(), 1);
252 assert!(logs[0].is_error());
253 assert!(matches!(
254 receiver.result().unwrap(),
255 RunResult::Failed { .. }
256 ));
257 }
258
259 #[astro_run_test::test]
260 async fn test_exec_stderr_command() {
261 let mut cmd = Command::new("cd /not/exist");
262
263 let res = cmd.exec().await;
264
265 assert!(res.is_err());
266 }
267}