1#[cfg(unix)]
2use std::os::unix::process::ExitStatusExt;
3#[cfg(windows)]
4use std::os::windows::process::CommandExt;
5use std::{
6 ffi::OsStr,
7 io::Write,
8 process::{Command, Stdio},
9 sync::{Arc, RwLock},
10 thread::spawn,
11};
12
13use os_pipe::{pipe, PipeWriter};
14use serde::{Deserialize, Serialize};
15use shared_child::SharedChild;
16use tokio::sync::mpsc;
17
18use crate::{encoding::Encoding, options::CommandOptions, StdoutReader};
19
20pub type ProcessId = u32;
21
22#[cfg(windows)]
23const CREATE_NO_WINDOW: u32 = 0x08000000;
24
25#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
26#[serde(untagged)]
27pub enum Buffer {
28 Text(String),
29 Raw(Vec<u8>),
30}
31
32impl Buffer {
33 pub fn new(is_raw: bool) -> Buffer {
36 if is_raw {
37 Buffer::Raw(Vec::new())
38 } else {
39 Buffer::Text(String::new())
40 }
41 }
42
43 pub fn push(&mut self, buffer: Buffer) -> crate::Result<()> {
53 match self {
54 Buffer::Text(string) => {
55 let incoming_string =
56 buffer.as_str().ok_or(crate::Error::InvalidBuffer)?;
57
58 string.push_str(incoming_string);
59 }
60 Buffer::Raw(bytes) => bytes.extend_from_slice(buffer.as_bytes()),
61 }
62
63 Ok(())
64 }
65
66 pub fn as_str(&self) -> Option<&str> {
69 match self {
70 Buffer::Text(string) => Some(string),
71 Buffer::Raw(_) => None,
72 }
73 }
74
75 pub fn as_bytes(&self) -> &[u8] {
77 match self {
78 Buffer::Text(string) => string.as_bytes(),
79 Buffer::Raw(bytes) => bytes,
80 }
81 }
82}
83
84#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
86#[serde(tag = "type", content = "data", rename_all = "snake_case")]
87pub enum ChildProcessEvent {
88 Stdout(Buffer),
92
93 Stderr(Buffer),
97
98 Error(String),
100
101 Terminated(ExitStatus),
103}
104
105#[derive(Debug)]
107pub struct ChildProcess {
108 inner: Arc<SharedChild>,
109 stdin_writer: PipeWriter,
110 rx: mpsc::Receiver<ChildProcessEvent>,
111}
112
113impl ChildProcess {
114 pub fn write(&mut self, buffer: &[u8]) -> crate::Result<()> {
116 self.stdin_writer.write_all(buffer)?;
117 Ok(())
118 }
119
120 pub fn kill(self) -> crate::Result<()> {
122 self.inner.kill()?;
123 Ok(())
124 }
125
126 pub fn pid(&self) -> u32 {
128 self.inner.id()
129 }
130
131 pub fn events(&mut self) -> &mut mpsc::Receiver<ChildProcessEvent> {
133 &mut self.rx
134 }
135}
136
137#[derive(Clone, Debug, Default, Deserialize, Eq, PartialEq, Serialize)]
139pub struct ExitStatus {
140 pub code: Option<i32>,
142
143 pub success: bool,
145
146 pub signal: Option<i32>,
148}
149
150#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
152pub struct ShellExecOutput {
153 #[serde(flatten)]
155 pub status: ExitStatus,
156
157 pub stdout: Buffer,
159
160 pub stderr: Buffer,
162}
163
164#[derive(Debug)]
166pub struct Shell;
167
168impl Shell {
169 pub async fn exec<I, S>(
184 program: &str,
185 args: I,
186 options: &CommandOptions,
187 ) -> crate::Result<ShellExecOutput>
188 where
189 I: IntoIterator<Item = S>,
190 S: AsRef<OsStr>,
191 {
192 let mut child = Self::spawn(program, args, options)?;
193
194 let mut status = ExitStatus::default();
195 let mut stdout = Buffer::new(options.encoding == Encoding::Raw);
196 let mut stderr = Buffer::new(options.encoding == Encoding::Raw);
197
198 while let Some(event) = child.events().recv().await {
199 match event {
200 ChildProcessEvent::Terminated(exit_status) => {
201 status = exit_status;
202 }
203 ChildProcessEvent::Stdout(line) => {
204 stdout.push(line)?;
205 }
206 ChildProcessEvent::Stderr(line) => {
207 stderr.push(line)?;
208 }
209 ChildProcessEvent::Error(_) => {}
210 }
211 }
212
213 Ok(ShellExecOutput {
214 status,
215 stdout,
216 stderr,
217 })
218 }
219
220 pub async fn status<I, S>(
233 &self,
234 program: &str,
235 args: I,
236 options: &CommandOptions,
237 ) -> crate::Result<ExitStatus>
238 where
239 I: IntoIterator<Item = S>,
240 S: AsRef<OsStr>,
241 {
242 let mut child = Self::spawn(program, args, options)?;
243
244 while let Some(event) = child.events().recv().await {
245 if let ChildProcessEvent::Terminated(status) = event {
246 return Ok(status);
247 }
248 }
249
250 Ok(ExitStatus::default())
251 }
252
253 pub fn spawn<I, S>(
269 program: &str,
270 args: I,
271 options: &CommandOptions,
272 ) -> crate::Result<ChildProcess>
273 where
274 I: IntoIterator<Item = S>,
275 S: AsRef<OsStr>,
276 {
277 let mut command = Self::create_command(program, args, options);
278 Self::spawn_child(&mut command, options)
279 }
280
281 fn spawn_child(
283 command: &mut Command,
284 options: &CommandOptions,
285 ) -> crate::Result<ChildProcess> {
286 let (stdout_reader, stdout_writer) = pipe()?;
287 let (stderr_reader, stderr_writer) = pipe()?;
288 let (stdin_reader, stdin_writer) = pipe()?;
289
290 command.stdout(stdout_writer);
291 command.stderr(stderr_writer);
292 command.stdin(stdin_reader);
293
294 let shared_child = SharedChild::spawn(command)?;
295 let child = Arc::new(shared_child);
296 let child_ = child.clone();
297 let guard = Arc::new(RwLock::new(()));
298
299 let (tx, rx) = mpsc::channel(1);
300
301 Self::spawn_pipe_reader(
302 tx.clone(),
303 guard.clone(),
304 stdout_reader,
305 ChildProcessEvent::Stdout,
306 options.encoding.clone(),
307 );
308
309 Self::spawn_pipe_reader(
310 tx.clone(),
311 guard.clone(),
312 stderr_reader,
313 ChildProcessEvent::Stderr,
314 options.encoding.clone(),
315 );
316
317 spawn(move || {
318 let status = child_.wait();
319 let _lock = guard.write().unwrap();
320
321 let event = match status {
322 Ok(status) => ChildProcessEvent::Terminated(ExitStatus {
323 code: status.code(),
324 success: status.code().is_some_and(|code| code == 0),
325 #[cfg(windows)]
326 signal: None,
327 #[cfg(unix)]
328 signal: status.signal(),
329 }),
330 Err(err) => ChildProcessEvent::Error(err.to_string()),
331 };
332
333 let _ = tx.blocking_send(event);
334 });
335
336 Ok(ChildProcess {
337 inner: child,
338 stdin_writer,
339 rx,
340 })
341 }
342
343 fn create_command<I, S>(
345 program: &str,
346 args: I,
347 options: &CommandOptions,
348 ) -> Command
349 where
350 I: IntoIterator<Item = S>,
351 S: AsRef<OsStr>,
352 {
353 let mut command = Command::new(program);
354
355 if let Some(cwd) = &options.cwd {
356 command.current_dir(cwd);
357 }
358
359 if options.clear_env {
360 command.env_clear();
361 }
362
363 command.stdout(Stdio::piped());
364 command.stdin(Stdio::piped());
365 command.stderr(Stdio::piped());
366 command.args(args);
367 command.envs(&options.env);
368
369 #[cfg(windows)]
370 command.creation_flags(CREATE_NO_WINDOW);
371
372 command
373 }
374
375 fn spawn_pipe_reader<F>(
378 tx: mpsc::Sender<ChildProcessEvent>,
379 guard: Arc<RwLock<()>>,
380 pipe: os_pipe::PipeReader,
381 wrapper: F,
382 encoding: Encoding,
383 ) where
384 F: Fn(Buffer) -> ChildProcessEvent + Send + Copy + 'static,
385 {
386 spawn(move || {
387 let _lock = guard.read().unwrap();
388 let mut reader = StdoutReader::new(pipe, encoding);
389
390 while let Ok(Some(buffer)) = reader.read_next() {
391 if tx.blocking_send(wrapper(buffer)).is_err() {
392 break;
393 }
394 }
395 });
396 }
397}
398
399#[cfg(test)]
400mod tests {
401 use super::*;
402
403 #[tokio::test]
404 async fn test_echo_command() {
405 let output = Shell::exec(
406 if cfg!(windows) { "cmd" } else { "sh" },
407 &[if cfg!(windows) { "/C" } else { "-c" }, "echo hello world"],
408 &CommandOptions::default(),
409 )
410 .await
411 .unwrap();
412
413 assert!(output.status.success);
414 assert!(output.stderr.as_str().unwrap().is_empty());
415 assert!(output.stdout.as_str().unwrap().contains("hello world"));
416 }
417
418 #[tokio::test]
419 async fn test_command_failure() {
420 let output = Shell::exec(
421 if cfg!(windows) { "cmd" } else { "sh" },
422 &[
423 if cfg!(windows) { "/C" } else { "-c" },
424 "nonexistent_command",
425 ],
426 &CommandOptions::default(),
427 )
428 .await
429 .unwrap();
430
431 assert!(!output.status.success);
432 assert!(!output.stderr.as_str().unwrap().is_empty());
433 }
434
435 #[tokio::test]
436 async fn test_raw_output() {
437 let options = CommandOptions {
438 encoding: Encoding::Raw,
439 ..Default::default()
440 };
441
442 let mut child = Shell::spawn(
443 if cfg!(windows) { "cmd" } else { "sh" },
444 [if cfg!(windows) { "/C" } else { "-c" }, "echo test"],
445 &options,
446 )
447 .unwrap();
448
449 let mut saw_stdout = false;
450 while let Some(event) = child.events().recv().await {
451 match event {
452 ChildProcessEvent::Stdout(Buffer::Raw(bytes)) => {
453 assert!(!bytes.is_empty());
454 saw_stdout = true;
455 }
456 ChildProcessEvent::Terminated(status) => {
457 assert!(status.success);
458 }
459 _ => {}
460 }
461 }
462 assert!(saw_stdout);
463 }
464}