cmd_wrapper/
std_command.rs1use std::{
2 ffi::OsStr,
3 io::{self, Error, Read, Result, Write},
4 path::Path,
5 process::{Child, Command, ExitStatus, Stdio},
6 sync::mpsc::{self, Receiver},
7 thread::{self, JoinHandle},
8};
9
10use derivative::Derivative;
11use tracing::{instrument, warn};
12
13use crate::ProcessEnd;
14
15#[derive(Debug)]
16pub struct StdCommand {
17 command: Command,
18}
19
20impl StdCommand {
21 pub fn new<S: Into<String>>(program: S) -> Self {
22 Self {
23 command: Command::new(program.into()),
24 }
25 }
26
27 pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
28 self.command.arg(arg);
29 self
30 }
31
32 pub fn args<I, S>(&mut self, args: I) -> &mut Self
33 where
34 I: IntoIterator<Item = S>,
35 S: AsRef<OsStr>,
36 {
37 self.command.args(args);
38 self
39 }
40
41 pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
42 where
43 K: AsRef<OsStr>,
44 V: AsRef<OsStr>,
45 {
46 self.command.env(key, val);
47 self
48 }
49
50 pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
51 where
52 I: IntoIterator<Item = (K, V)>,
53 K: AsRef<OsStr>,
54 V: AsRef<OsStr>,
55 {
56 self.command.envs(vars);
57 self
58 }
59
60 pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
61 self.command.current_dir(dir);
62 self
63 }
64
65 pub fn stdin<T: Into<Stdio>>(&mut self, stdin: T) -> &mut Self {
66 self.command.stdin(stdin);
67 self
68 }
69
70 pub fn stdout<T: Into<Stdio>>(&mut self, stdout: T) -> &mut Self {
71 self.command.stdout(stdout);
72 self
73 }
74
75 pub fn stderr<T: Into<Stdio>>(&mut self, stderr: T) -> &mut Self {
76 self.command.stderr(stderr);
77 self
78 }
79
80 #[instrument(level = "trace")]
81 pub fn into_inner(self) -> Command {
82 self.command
83 }
84
85 #[instrument(level = "trace")]
86 pub fn child(mut self) -> Result<Child> {
87 self.command.spawn()
88 }
89
90 #[instrument(level = "trace")]
91 pub fn spawn(&mut self) -> Result<StdCommandProcess> {
92 let mut child = self.command.spawn()?;
93
94 let stdin = child
95 .stdin
96 .take()
97 .map(|s| Box::new(s) as Box<dyn Write>)
98 .unwrap_or_else(|| Box::new(SyncSink));
99 let stdout = child
100 .stdout
101 .take()
102 .map(|s| Box::new(s) as Box<dyn Read>)
103 .unwrap_or_else(|| Box::new(io::empty()));
104 let stderr = child
105 .stderr
106 .take()
107 .map(|s| Box::new(s) as Box<dyn Read + Send>)
108 .unwrap_or_else(|| Box::new(io::empty()));
109
110 let pid = child.id();
111
112 let (end_tx, end_rx) = mpsc::channel::<ProcessEnd>();
113 let end_handler = thread::spawn(move || {
114 let end_msg = match child.wait() {
115 Ok(status) => {
116 if let Some(0) = status.code() {
117 ProcessEnd::Success
118 } else {
119 let err = Self::read_full_stderr_if_any(stderr).err();
120 if let Some(code) = status.code() {
121 ProcessEnd::Failed(code, err)
122 } else {
123 ProcessEnd::Killed(err)
124 }
125 }
126 }
127 Err(err) => ProcessEnd::Error(err),
128 };
129
130 if let Err(err) = end_tx.send(end_msg) {
131 warn!(?err, "receiver dropped before sending end msg");
132 }
133 });
134
135 Ok(StdCommandProcess {
136 stdin,
137 stdout,
138 pid,
139 end_rx,
140 end_handler,
141 })
142 }
143
144 #[instrument(level = "trace")]
145 pub fn output(&mut self) -> Result<String> {
146 let output = self.command.output()?;
147
148 if !output.status.success() {
149 let stderr = String::from_utf8_lossy(&output.stderr);
150 return Err(Error::other(stderr));
151 }
152
153 Ok(String::from_utf8_lossy(&output.stdout).to_string())
154 }
155
156 #[instrument(level = "trace")]
157 pub fn status(&mut self) -> Result<ExitStatus> {
158 self.command.status()
159 }
160
161 #[instrument(level = "trace")]
162 pub fn execute_and_print(&mut self) -> Result<()> {
163 let output = self.command.output()?;
164 if !output.stdout.is_empty() {
165 io::stdout().write_all(&output.stdout)?;
166 }
167 if !output.stderr.is_empty() {
168 io::stderr().write_all(&output.stderr)?;
169 }
170 Ok(())
171 }
172
173 fn read_full_stderr_if_any<R: Read + Send>(mut stderr: R) -> Result<()> {
174 let mut peek_buf = vec![0u8; 1024];
175 let n = stderr.read(&mut peek_buf)?;
176 if n == 0 {
177 return Ok(());
178 }
179
180 let mut full = peek_buf[..n].to_vec();
181 let mut rest = Vec::new();
182 stderr.read_to_end(&mut rest)?;
183 full.extend(rest);
184
185 let msg = String::from_utf8_lossy(&full).trim().to_string();
186 if !msg.is_empty() {
187 return Err(Error::other(msg));
188 }
189
190 Ok(())
191 }
192}
193
194#[derive(Derivative)]
195#[derivative(Debug)]
196pub struct StdCommandProcess {
197 #[derivative(Debug = "ignore")]
198 pub stdin: Box<dyn Write>,
199 #[derivative(Debug = "ignore")]
200 pub stdout: Box<dyn Read>,
201 pub pid: u32,
202 pub end_rx: Receiver<ProcessEnd>,
203 pub end_handler: JoinHandle<()>,
204}
205
206struct SyncSink;
207
208impl Write for SyncSink {
209 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
210 Ok(buf.len())
211 }
212 fn flush(&mut self) -> io::Result<()> {
213 Ok(())
214 }
215}