cmd_wrapper/
std_command.rs1use std::{
2 ffi::OsStr,
3 io::{self, Error, Read, Result, Write},
4 path::Path,
5 process::{Child, Command, ExitStatus, Stdio},
6 sync::mpsc::{self, Receiver},
7 thread::{self, JoinHandle},
8};
9
10use derivative::Derivative;
11use tracing::{instrument, warn};
12
13#[derive(Debug)]
14pub struct StdCommand {
15 command: Command,
16}
17
18impl StdCommand {
19 pub fn new<S: Into<String>>(program: S) -> Self {
20 Self {
21 command: Command::new(program.into()),
22 }
23 }
24
25 pub fn arg<S: AsRef<OsStr>>(&mut self, arg: S) -> &mut Self {
26 self.command.arg(arg);
27 self
28 }
29
30 pub fn args<I, S>(&mut self, args: I) -> &mut Self
31 where
32 I: IntoIterator<Item = S>,
33 S: AsRef<OsStr>,
34 {
35 self.command.args(args);
36 self
37 }
38
39 pub fn env<K, V>(&mut self, key: K, val: V) -> &mut Self
40 where
41 K: AsRef<OsStr>,
42 V: AsRef<OsStr>,
43 {
44 self.command.env(key, val);
45 self
46 }
47
48 pub fn envs<I, K, V>(&mut self, vars: I) -> &mut Self
49 where
50 I: IntoIterator<Item = (K, V)>,
51 K: AsRef<OsStr>,
52 V: AsRef<OsStr>,
53 {
54 self.command.envs(vars);
55 self
56 }
57
58 pub fn current_dir<P: AsRef<Path>>(&mut self, dir: P) -> &mut Self {
59 self.command.current_dir(dir);
60 self
61 }
62
63 pub fn stdin<T: Into<Stdio>>(&mut self, stdin: T) -> &mut Self {
64 self.command.stdin(stdin);
65 self
66 }
67
68 pub fn stdout<T: Into<Stdio>>(&mut self, stdout: T) -> &mut Self {
69 self.command.stdout(stdout);
70 self
71 }
72
73 pub fn stderr<T: Into<Stdio>>(&mut self, stderr: T) -> &mut Self {
74 self.command.stderr(stderr);
75 self
76 }
77
78 #[instrument(level = "trace")]
79 pub fn into_inner(self) -> Command {
80 self.command
81 }
82
83 #[instrument(level = "trace")]
84 pub fn child(mut self) -> Result<Child> {
85 self.command.spawn()
86 }
87
88 #[instrument(level = "trace")]
89 pub fn spawn(&mut self) -> Result<StdCommandProcess> {
90 let mut child = self.command.spawn()?;
91
92 let stdin = child
93 .stdin
94 .take()
95 .map(|s| Box::new(s) as Box<dyn Write>)
96 .unwrap_or_else(|| Box::new(SyncSink));
97 let stdout = child
98 .stdout
99 .take()
100 .map(|s| Box::new(s) as Box<dyn Read>)
101 .unwrap_or_else(|| Box::new(io::empty()));
102 let stderr = child
103 .stderr
104 .take()
105 .map(|s| Box::new(s) as Box<dyn Read + Send>)
106 .unwrap_or_else(|| Box::new(io::empty()));
107
108 let pid = child.id();
109
110 let (end_tx, end_rx) = mpsc::channel::<(Option<i32>, Option<io::Error>)>();
111 let end_handler = thread::spawn(move || {
112 let end_msg = match child.wait() {
113 Ok(status) if status.code() == Some(0) => (Some(0), None),
114 Ok(status) => (status.code(), Self::read_full_stderr_if_any(stderr).err()),
115 Err(err) => (Some(-1), Some(err)),
116 };
117
118 if let Err(err) = end_tx.send(end_msg) {
119 warn!(?err, "receiver dropped before sending end msg");
120 }
121 });
122
123 Ok(StdCommandProcess {
124 stdin,
125 stdout,
126 pid,
127 end_rx,
128 end_handler,
129 })
130 }
131
132 #[instrument(level = "trace")]
133 pub fn output(&mut self) -> Result<String> {
134 let output = self.command.output()?;
135
136 if !output.status.success() {
137 let stderr = String::from_utf8_lossy(&output.stderr);
138 return Err(Error::other(stderr));
139 }
140
141 Ok(String::from_utf8_lossy(&output.stdout).to_string())
142 }
143
144 #[instrument(level = "trace")]
145 pub fn status(&mut self) -> Result<ExitStatus> {
146 self.command.status()
147 }
148
149 #[instrument(level = "trace")]
150 pub fn execute_and_print(&mut self) -> Result<()> {
151 let output = self.command.output()?;
152 if !output.stdout.is_empty() {
153 io::stdout().write_all(&output.stdout)?;
154 }
155 if !output.stderr.is_empty() {
156 io::stderr().write_all(&output.stderr)?;
157 }
158 Ok(())
159 }
160
161 fn read_full_stderr_if_any<R: Read + Send>(mut stderr: R) -> Result<()> {
162 let mut peek_buf = vec![0u8; 1024];
163 let n = stderr.read(&mut peek_buf)?;
164 if n == 0 {
165 return Ok(());
166 }
167
168 let mut full = peek_buf[..n].to_vec();
169 let mut rest = Vec::new();
170 stderr.read_to_end(&mut rest)?;
171 full.extend(rest);
172
173 let msg = String::from_utf8_lossy(&full).trim().to_string();
174 if !msg.is_empty() {
175 return Err(Error::other(msg));
176 }
177
178 Ok(())
179 }
180}
181
182#[derive(Derivative)]
183#[derivative(Debug)]
184pub struct StdCommandProcess {
185 #[derivative(Debug = "ignore")]
186 pub stdin: Box<dyn Write>,
187 #[derivative(Debug = "ignore")]
188 pub stdout: Box<dyn Read>,
189 pub pid: u32,
190 pub end_rx: Receiver<(Option<i32>, Option<io::Error>)>,
191 pub end_handler: JoinHandle<()>,
192}
193
194struct SyncSink;
195
196impl Write for SyncSink {
197 fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
198 Ok(buf.len())
199 }
200 fn flush(&mut self) -> io::Result<()> {
201 Ok(())
202 }
203}