cmd_wrapper/
std_command.rs

1use std::{
2    ffi::OsStr,
3    io::{self, Error, Read, Result, Write},
4    path::Path,
5    process::{Child, Command, ExitStatus, Stdio},
6    sync::mpsc::{self, Receiver},
7    thread::{self, JoinHandle},
8};
9
10use derivative::Derivative;
11use tracing::{instrument, warn};
12
13#[derive(Debug)]
14pub struct StdCommand {
15    command: Command,
16}
17
18impl StdCommand {
19    pub fn new<S: Into<String>>(program: S) -> Self {
20        Self {
21            command: Command::new(program.into()),
22        }
23    }
24
25    pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
26        self.command.arg(arg);
27        self
28    }
29
30    pub fn args<I, S>(&mut self, args: I) -> &mut Self
31    where
32        I: IntoIterator<Item = S>,
33        S: AsRef<OsStr>,
34    {
35        self.command.args(args);
36        self
37    }
38
39    pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
40    where
41        K: AsRef<OsStr>,
42        V: AsRef<OsStr>,
43    {
44        self.command.env(key, val);
45        self
46    }
47
48    pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
49    where
50        I: IntoIterator<Item = (K, V)>,
51        K: AsRef<OsStr>,
52        V: AsRef<OsStr>,
53    {
54        self.command.envs(vars);
55        self
56    }
57
58    pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
59        self.command.current_dir(dir);
60        self
61    }
62
63    pub fn stdin<T: Into<Stdio>>(&mut self, stdin: T) -> &mut Self {
64        self.command.stdin(stdin);
65        self
66    }
67
68    pub fn stdout<T: Into<Stdio>>(&mut self, stdout: T) -> &mut Self {
69        self.command.stdout(stdout);
70        self
71    }
72
73    pub fn stderr<T: Into<Stdio>>(&mut self, stderr: T) -> &mut Self {
74        self.command.stderr(stderr);
75        self
76    }
77
78    #[instrument(level = "trace")]
79    pub fn into_inner(self) -> Command {
80        self.command
81    }
82
83    #[instrument(level = "trace")]
84    pub fn child(mut self) -> Result<Child> {
85        self.command.spawn()
86    }
87
88    #[instrument(level = "trace")]
89    pub fn spawn(&mut self) -> Result<StdCommandProcess> {
90        let mut child = self.command.spawn()?;
91
92        let stdin = child
93            .stdin
94            .take()
95            .map(|s| Box::new(s) as Box<dyn Write>)
96            .unwrap_or_else(|| Box::new(SyncSink));
97        let stdout = child
98            .stdout
99            .take()
100            .map(|s| Box::new(s) as Box<dyn Read>)
101            .unwrap_or_else(|| Box::new(io::empty()));
102        let stderr = child
103            .stderr
104            .take()
105            .map(|s| Box::new(s) as Box<dyn Read + Send>)
106            .unwrap_or_else(|| Box::new(io::empty()));
107
108        let pid = child.id();
109
110        let (end_tx, end_rx) = mpsc::channel::<(Option<i32>, Option<io::Error>)>();
111        let end_handler = thread::spawn(move || {
112            let end_msg = match child.wait() {
113                Ok(status) if status.code() == Some(0) => (Some(0), None),
114                Ok(status) => (status.code(), Self::read_full_stderr_if_any(stderr).err()),
115                Err(err) => (Some(-1), Some(err)),
116            };
117
118            if let Err(err) = end_tx.send(end_msg) {
119                warn!(?err, "receiver dropped before sending end msg");
120            }
121        });
122
123        Ok(StdCommandProcess {
124            stdin,
125            stdout,
126            pid,
127            end_rx,
128            end_handler,
129        })
130    }
131
132    #[instrument(level = "trace")]
133    pub fn output(&mut self) -> Result<String> {
134        let output = self.command.output()?;
135
136        if !output.status.success() {
137            let stderr = String::from_utf8_lossy(&output.stderr);
138            return Err(Error::other(stderr));
139        }
140
141        Ok(String::from_utf8_lossy(&output.stdout).to_string())
142    }
143
144    #[instrument(level = "trace")]
145    pub fn status(&mut self) -> Result<ExitStatus> {
146        self.command.status()
147    }
148
149    #[instrument(level = "trace")]
150    pub fn execute_and_print(&mut self) -> Result<()> {
151        let output = self.command.output()?;
152        if !output.stdout.is_empty() {
153            io::stdout().write_all(&output.stdout)?;
154        }
155        if !output.stderr.is_empty() {
156            io::stderr().write_all(&output.stderr)?;
157        }
158        Ok(())
159    }
160
161    fn read_full_stderr_if_any<R: Read + Send>(mut stderr: R) -> Result<()> {
162        let mut peek_buf = vec![0u8; 1024];
163        let n = stderr.read(&mut peek_buf)?;
164        if n == 0 {
165            return Ok(());
166        }
167
168        let mut full = peek_buf[..n].to_vec();
169        let mut rest = Vec::new();
170        stderr.read_to_end(&mut rest)?;
171        full.extend(rest);
172
173        let msg = String::from_utf8_lossy(&full).trim().to_string();
174        if !msg.is_empty() {
175            return Err(Error::other(msg));
176        }
177
178        Ok(())
179    }
180}
181
182#[derive(Derivative)]
183#[derivative(Debug)]
184pub struct StdCommandProcess {
185    #[derivative(Debug = "ignore")]
186    pub stdin: Box<dyn Write>,
187    #[derivative(Debug = "ignore")]
188    pub stdout: Box<dyn Read>,
189    pub pid: u32,
190    pub end_rx: Receiver<(Option<i32>, Option<io::Error>)>,
191    pub end_handler: JoinHandle<()>,
192}
193
194struct SyncSink;
195
196impl Write for SyncSink {
197    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
198        Ok(buf.len())
199    }
200    fn flush(&mut self) -> io::Result<()> {
201        Ok(())
202    }
203}