cmd_wrapper/
std_command.rs

1use std::{
2    ffi::OsStr,
3    io::{self, Error, Read, Result, Write},
4    path::Path,
5    process::{Child, Command, ExitStatus, Stdio},
6    sync::mpsc::{self, Receiver},
7    thread::{self, JoinHandle},
8};
9
10use derivative::Derivative;
11use tracing::{instrument, warn};
12
13use crate::ProcessEnd;
14
15#[derive(Debug)]
16pub struct StdCommand {
17    command: Command,
18}
19
20impl StdCommand {
21    pub fn new<S: Into<String>>(program: S) -> Self {
22        Self {
23            command: Command::new(program.into()),
24        }
25    }
26
27    pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
28        self.command.arg(arg);
29        self
30    }
31
32    pub fn args<I, S>(&mut self, args: I) -> &mut Self
33    where
34        I: IntoIterator<Item = S>,
35        S: AsRef<OsStr>,
36    {
37        self.command.args(args);
38        self
39    }
40
41    pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
42    where
43        K: AsRef<OsStr>,
44        V: AsRef<OsStr>,
45    {
46        self.command.env(key, val);
47        self
48    }
49
50    pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
51    where
52        I: IntoIterator<Item = (K, V)>,
53        K: AsRef<OsStr>,
54        V: AsRef<OsStr>,
55    {
56        self.command.envs(vars);
57        self
58    }
59
60    pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
61        self.command.current_dir(dir);
62        self
63    }
64
65    pub fn stdin<T: Into<Stdio>>(&mut self, stdin: T) -> &mut Self {
66        self.command.stdin(stdin);
67        self
68    }
69
70    pub fn stdout<T: Into<Stdio>>(&mut self, stdout: T) -> &mut Self {
71        self.command.stdout(stdout);
72        self
73    }
74
75    pub fn stderr<T: Into<Stdio>>(&mut self, stderr: T) -> &mut Self {
76        self.command.stderr(stderr);
77        self
78    }
79
80    #[instrument(level = "trace")]
81    pub fn into_inner(self) -> Command {
82        self.command
83    }
84
85    #[instrument(level = "trace")]
86    pub fn child(mut self) -> Result<Child> {
87        self.command.spawn()
88    }
89
90    #[instrument(level = "trace")]
91    pub fn spawn(&mut self) -> Result<StdCommandProcess> {
92        let mut child = self.command.spawn()?;
93
94        let stdin = child
95            .stdin
96            .take()
97            .map(|s| Box::new(s) as Box<dyn Write>)
98            .unwrap_or_else(|| Box::new(SyncSink));
99        let stdout = child
100            .stdout
101            .take()
102            .map(|s| Box::new(s) as Box<dyn Read>)
103            .unwrap_or_else(|| Box::new(io::empty()));
104        let stderr = child
105            .stderr
106            .take()
107            .map(|s| Box::new(s) as Box<dyn Read + Send>)
108            .unwrap_or_else(|| Box::new(io::empty()));
109
110        let pid = child.id();
111
112        let (end_tx, end_rx) = mpsc::channel::<ProcessEnd>();
113        let end_handler = thread::spawn(move || {
114            let end_msg = match child.wait() {
115                Ok(status) => {
116                    if let Some(0) = status.code() {
117                        ProcessEnd::Success
118                    } else {
119                        let err = Self::read_full_stderr_if_any(stderr).err();
120                        if let Some(code) = status.code() {
121                            ProcessEnd::Failed(code, err)
122                        } else {
123                            ProcessEnd::Killed(err)
124                        }
125                    }
126                }
127                Err(err) => ProcessEnd::Error(err),
128            };
129
130            if let Err(err) = end_tx.send(end_msg) {
131                warn!(?err, "receiver dropped before sending end msg");
132            }
133        });
134
135        Ok(StdCommandProcess {
136            stdin,
137            stdout,
138            pid,
139            end_rx,
140            end_handler,
141        })
142    }
143
144    #[instrument(level = "trace")]
145    pub fn output(&mut self) -> Result<String> {
146        let output = self.command.output()?;
147
148        if !output.status.success() {
149            let stderr = String::from_utf8_lossy(&output.stderr);
150            return Err(Error::other(stderr));
151        }
152
153        Ok(String::from_utf8_lossy(&output.stdout).to_string())
154    }
155
156    #[instrument(level = "trace")]
157    pub fn status(&mut self) -> Result<ExitStatus> {
158        self.command.status()
159    }
160
161    #[instrument(level = "trace")]
162    pub fn execute_and_print(&mut self) -> Result<()> {
163        let output = self.command.output()?;
164        if !output.stdout.is_empty() {
165            io::stdout().write_all(&output.stdout)?;
166        }
167        if !output.stderr.is_empty() {
168            io::stderr().write_all(&output.stderr)?;
169        }
170        Ok(())
171    }
172
173    fn read_full_stderr_if_any<R: Read + Send>(mut stderr: R) -> Result<()> {
174        let mut peek_buf = vec![0u8; 1024];
175        let n = stderr.read(&mut peek_buf)?;
176        if n == 0 {
177            return Ok(());
178        }
179
180        let mut full = peek_buf[..n].to_vec();
181        let mut rest = Vec::new();
182        stderr.read_to_end(&mut rest)?;
183        full.extend(rest);
184
185        let msg = String::from_utf8_lossy(&full).trim().to_string();
186        if !msg.is_empty() {
187            return Err(Error::other(msg));
188        }
189
190        Ok(())
191    }
192}
193
194#[derive(Derivative)]
195#[derivative(Debug)]
196pub struct StdCommandProcess {
197    #[derivative(Debug = "ignore")]
198    pub stdin: Box<dyn Write>,
199    #[derivative(Debug = "ignore")]
200    pub stdout: Box<dyn Read>,
201    pub pid: u32,
202    pub end_rx: Receiver<ProcessEnd>,
203    pub end_handler: JoinHandle<()>,
204}
205
206struct SyncSink;
207
208impl Write for SyncSink {
209    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
210        Ok(buf.len())
211    }
212    fn flush(&mut self) -> io::Result<()> {
213        Ok(())
214    }
215}