1use std::ffi::{OsStr, OsString};
2use std::fmt::{Display, Formatter};
3use std::io;
4use std::io::{BufRead, BufReader, ErrorKind};
5use std::path::Path;
6use std::process::{ChildStderr, ChildStdout, Command, ExitStatus, Output, Stdio};
7use std::sync::{Arc, Condvar, Mutex};
8use std::time::Duration;
9
10use crossbeam::channel::Receiver;
11use crossbeam_channel::{tick, Select};
12use tracing::{error, trace, warn};
13
14use crate::debug::CommandDebug;
15use crate::errors::CmdError;
16use crate::{Cmd, CommandBuilder, Error, OutputResult, Vec8ToString};
17
18impl Display for Cmd {
19 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20 write!(f, "{:?} {:?}", self.program, self.args)
21 }
22}
23
24impl Display for CommandBuilder {
25 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26 write!(
27 f,
28 "{:} {:}",
29 self.program.to_str().unwrap(),
30 self.args.join(OsStr::new(" ")).to_str().unwrap()
31 )
32 }
33}
34
35impl OutputResult for Output {
36 fn to_result(&self) -> crate::Result<Vec<u8>> {
37 if self.status.success() && self.stderr.is_empty() {
38 Ok(self.stdout.to_owned())
39 } else {
40 Err(crate::Error::CommandError(CmdError::from_err(
41 self.status,
42 self.stdout.to_owned(),
43 self.stderr.to_owned(),
44 )))
45 }
46 }
47
48 fn try_to_result(&self) -> crate::Result<Vec<u8>> {
49 if self.status.code().is_none() && self.stderr.is_empty() {
50 Ok(self.stdout.to_owned())
51 } else {
52 Err(crate::Error::CommandError(CmdError::from_err(
53 self.status,
54 self.stdout.to_owned(),
55 self.stderr.to_owned(),
56 )))
57 }
58 }
59}
60
61impl CommandBuilder {
62 pub fn new<S: AsRef<OsStr>>(program: S) -> CommandBuilder {
63 CommandBuilder {
64 program: OsString::from(program.as_ref()),
65 timeout: None,
66 cwd: None,
67 debug: false,
68 args: vec![],
69 stdin: None,
70 stdout: Some(Stdio::piped()),
71 stderr: Some(Stdio::piped()),
72 signal: None,
73 }
74 }
75
76 pub fn with_debug(mut self, debug: bool) -> Self {
77 self.debug = debug;
78 self
79 }
80
81 pub fn with_timeout(mut self, duration: Duration) -> Self {
82 self.timeout = Some(duration);
83 self
84 }
85
86 pub fn timeout(mut self, duration: Option<Duration>) -> Self {
87 self.timeout = duration;
88 self
89 }
90
91 pub fn with_signal(mut self, signal: Receiver<()>) -> Self {
92 self.signal = Some(signal);
93 self
94 }
95
96 pub fn signal(mut self, signal: Option<Receiver<()>>) -> Self {
97 self.signal = signal;
98 self
99 }
100
101 pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
102 self.args.push(arg.as_ref().into());
103 self
104 }
105
106 pub fn with_arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
107 self.args.push(arg.as_ref().into());
108 self
109 }
110
111 pub fn args<I, S>(mut self, args: I) -> Self
112 where
113 I: IntoIterator<Item = S>,
114 S: AsRef<OsStr>,
115 {
116 for arg in args {
117 self.args.push(arg.as_ref().into());
118 }
119 self
120 }
121
122 pub fn with_args<I, S>(mut self, args: I) -> Self
123 where
124 I: IntoIterator<Item = S>,
125 S: AsRef<OsStr>,
126 {
127 for arg in args {
128 self.args.push(arg.as_ref().into());
129 }
130 self
131 }
132
133 pub fn stdout<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
134 if let Some(cfg) = cfg {
135 self.stdout = Some(cfg.into());
136 } else {
137 self.stdout = None;
138 }
139 self
140 }
141
142 pub fn stderr<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
143 if let Some(cfg) = cfg {
144 self.stderr = Some(cfg.into());
145 } else {
146 self.stderr = None;
147 }
148 self
149 }
150
151 pub fn stdin<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
152 if let Some(cfg) = cfg {
153 self.stdin = Some(cfg.into());
154 } else {
155 self.stdin = None;
156 }
157 self
158 }
159
160 pub fn current_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
161 self.cwd = Some(dir.as_ref().into());
162 self
163 }
164
165 pub fn get_current_dir(&self) -> Option<&Path> {
166 self.cwd.as_ref().map(|cs| Path::new(cs))
167 }
168
169 pub fn build(mut self) -> Cmd {
170 return Cmd {
171 debug: self.debug,
172 program: self.program.to_owned(),
173 args: self.args.to_owned(),
174 stdin: self.stdin.take(),
175 stdout: self.stdout.take(),
176 stderr: self.stderr.take(),
177 timeout: self.timeout.take(),
178 signal: self.signal.take(),
179 cwd: self.cwd.take(),
180 };
181 }
182}
183
184impl Cmd {
185 pub fn builder<S: AsRef<OsStr>>(program: S) -> CommandBuilder {
188 CommandBuilder::new(program)
189 }
190
191 pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
192 Cmd {
193 program: OsString::from(program.as_ref()),
194 cwd: None,
195 timeout: None,
196 debug: false,
197 args: vec![],
198 stdin: None,
199 stdout: None,
200 stderr: None,
201 signal: None,
202 }
203 }
204
205 pub fn command(mut self) -> Command {
206 let mut command = Command::new(self.program.to_os_string());
207 command.args(self.args.clone());
208
209 if let Some(stdin) = self.stdin.take() {
210 command.stdin(stdin);
211 }
212
213 if let Some(stdout) = self.stdout.take() {
214 command.stdout(stdout);
215 }
216
217 if let Some(stderr) = self.stderr.take() {
218 command.stderr(stderr);
219 }
220
221 if let Some(cwd) = self.cwd.take() {
222 command.current_dir(cwd);
223 }
224
225 command
226 }
227
228 pub fn run(mut self) -> crate::Result<Option<ExitStatus>> {
231 if self.debug {
232 self.debug();
233 }
234
235 let mut command = self.command();
236 let mut child = command.spawn().unwrap();
237 drop(command);
238 child.try_wait().map_err(|e| crate::Error::IoError(e))
239 }
240
241 pub fn output(self) -> crate::Result<Output> {
242 self.wait_for_output()
243 }
244
245 pub(crate) fn wait_for_output(mut self) -> crate::Result<Output> {
246 let has_debug = self.debug;
247 if has_debug {
248 self.debug();
249 }
250
251 let cancel_signal = self.signal.take();
252 let ticks = self.timeout.take().map(|t| tick(t));
253
254 let mut command = self.command();
255 let mut child = command.spawn().unwrap();
256
257 let stdout = child.stdout.take();
258 let stderr = child.stderr.take();
259
260 let status_receiver = Arc::new((Mutex::new(None), Condvar::new()));
261 let status_receiver_cloned = Arc::clone(&status_receiver);
262
263 drop(command);
264
265 let local_thread = std::thread::Builder::new().name("cmd_wait".to_string()).spawn(move || {
266 let (lock, condvar) = &*status_receiver_cloned;
267 let mut status_mutex = lock.lock().unwrap();
268
269 let mut sel = Select::new();
270 let mut oper_cancel: Option<usize> = None;
271 let mut oper_timeout: Option<usize> = None;
272
273 if cancel_signal.is_some() {
274 oper_cancel = Some(sel.recv(cancel_signal.as_ref().unwrap()));
275 }
276
277 if ticks.is_some() {
278 oper_timeout = Some(sel.recv(ticks.as_ref().unwrap()));
279 }
280
281 let mut killed = false;
282
283 loop {
284 match sel.try_ready() {
285 Err(_) => {
286 if let Ok(Some(status)) = child.try_wait() {
287 *status_mutex = Some(status);
288 condvar.notify_one();
289 break;
290 }
291 }
292
293 Ok(i) if !killed && oper_cancel.is_some() && i == oper_cancel.unwrap() => {
294 if has_debug {
295 warn!("ctrl+c received");
296 }
297 sel.remove(oper_cancel.unwrap());
298 let _ = child.kill();
299 killed = true;
300 }
301
302 Ok(i) if !killed && oper_timeout.is_some() && i == oper_timeout.unwrap() => {
303 if has_debug {
304 warn!("command timeout! killing the process...");
305 }
306 sel.remove(oper_timeout.unwrap());
307 let _ = child.kill();
308 killed = true;
309 }
310
311 Ok(i) => {
312 if has_debug {
313 warn!("Invalid operation index {i}!");
314 }
315 break;
316 }
317 }
318 }
319 })?;
320
321 let output = Cmd::read_to_end(stdout, stderr);
323
324 if let Err(_err) = local_thread.join() {
326 warn!("failed to join the thread!");
327 }
328
329 let (lock, cvar) = &*status_receiver;
331 let mut status = lock.lock().unwrap();
332 while status.is_none() {
333 (status, _) = cvar.wait_timeout(status, Duration::from_secs(1)).unwrap();
334 break;
335 }
337
338 match output {
341 Ok(output) => Ok(Output {
342 status: status.unwrap(),
343 stdout: output.0,
344 stderr: output.1,
345 }),
346 Err(e) => Err(e),
347 }
348 }
349
350 pub fn read_to_end(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> crate::Result<(Vec<u8>, Vec<u8>)> {
351 let mut stdout_writer: Vec<u8> = Vec::new();
352 let mut stderr_writer: Vec<u8> = Vec::new();
353
354 if let Some(stdout) = stdout {
355 let stdout_reader = BufReader::new(stdout);
356 for line in <BufReader<ChildStdout> as BufReaderExt<BufReader<ChildStdout>>>::lines_vec(stdout_reader) {
357 stdout_writer.extend(line?);
358 }
359 }
360
361 if let Some(stderr) = stderr {
362 let stderr_reader = BufReader::new(stderr);
363 for line in <BufReader<ChildStderr> as BufReaderExt<BufReader<ChildStderr>>>::lines_vec(stderr_reader) {
364 stderr_writer.extend(line?);
365 }
366 }
367
368 Ok((stdout_writer, stderr_writer))
369 }
370
371 pub fn pipe<T>(mut self, cmd2: T) -> Result<Output, Error>
372 where
373 T: Into<Command>,
374 {
375 let mut other = cmd2.into();
376
377 if self.debug {
378 let s1 = self.as_string();
379 let s2 = other.as_string();
380 trace!("Executing `{s1} | {s2}`...");
381 }
382
383 let cancel_signal = self.signal.take();
384 let ticks = self.timeout.take().map(|t| tick(t));
385
386 let mut command1 = self.command();
387 let mut child1 = command1.spawn().unwrap();
388
389 let child1_stdout: ChildStdout = child1
390 .stdout
391 .take()
392 .ok_or(io::Error::new(ErrorKind::InvalidData, "child stdout unavailable"))?;
393
394 let fd: Stdio = child1_stdout.try_into().unwrap();
395
396 other.stdin(fd);
397
398 let mut child2 = other.spawn().unwrap();
399
400 let stdout = child2.stdout.take();
401 let stderr = child2.stderr.take();
402
403 let status_receiver = Arc::new((Mutex::new(None), Condvar::new()));
404 let status_receiver_cloned = Arc::clone(&status_receiver);
405
406 drop(command1);
407 drop(other);
408
409 let local_thread = std::thread::Builder::new().name("cmd_wait".to_string()).spawn(move || {
410 let (lock, condvar) = &*status_receiver_cloned;
411 let mut status_mutex = lock.lock().unwrap();
412
413 let mut sel = Select::new();
414 let mut oper_cancel: Option<usize> = None;
415 let mut oper_timeout: Option<usize> = None;
416
417 if cancel_signal.is_some() {
418 oper_cancel = Some(sel.recv(cancel_signal.as_ref().unwrap()));
419 }
420
421 if ticks.is_some() {
422 oper_timeout = Some(sel.recv(ticks.as_ref().unwrap()));
423 }
424
425 let mut killed = false;
426
427 loop {
428 match sel.try_ready() {
429 Err(_) => {
430 if let Ok(Some(status)) = child2.try_wait() {
431 let _ = child1.kill();
433 *status_mutex = Some(status);
434 condvar.notify_one();
435 break;
436 }
437
438 if !killed {
439 if let Ok(Some(_status1)) = child1.try_wait() {
440 if let Ok(Some(_status)) = child2.try_wait() {
442 killed = true;
444 } else {
445 killed = true;
448 }
449 }
450 }
451 }
452
453 Ok(i) if !killed && oper_cancel.is_some() && i == oper_cancel.unwrap() => {
454 sel.remove(oper_cancel.unwrap());
455 let _ = child1.kill();
456 let _ = child2.kill();
457 killed = true;
458 }
459
460 Ok(i) if !killed && oper_timeout.is_some() && i == oper_timeout.unwrap() => {
461 sel.remove(oper_timeout.unwrap());
462 let _ = child1.kill();
463 let _ = child2.kill();
464 killed = true;
465 }
466
467 Ok(i) => {
468 error!("Invalid operation index {i}!");
469 break;
470 }
471 }
472 }
473 })?;
474
475 let output = Cmd::read_to_end(stdout, stderr);
477
478 if let Err(_err) = local_thread.join() {
480 warn!("failed to join the thread!");
481 }
482
483 let (lock, cvar) = &*status_receiver;
485 let mut status = lock.lock().unwrap();
486 while status.is_none() {
487 (status, _) = cvar.wait_timeout(status, Duration::from_secs(1)).unwrap();
488 break;
489 }
490
491 match output {
492 Ok(output) => Ok(Output {
493 status: status.unwrap(),
494 stdout: output.0,
495 stderr: output.1,
496 }),
497 Err(e) => Err(e),
498 }
499 }
500}
501
502impl Vec8ToString for Vec<u8> {
503 fn as_str(&self) -> Option<&str> {
504 match std::str::from_utf8(self) {
505 Ok(s) => Some(s),
506 Err(_) => None,
507 }
508 }
509}
510
511pub(crate) trait BufReaderExt<B: BufRead> {
512 fn lines_vec(self) -> LinesVec<Self>
513 where
514 Self: Sized;
515}
516
517pub struct LinesVec<B> {
518 buf: B,
519}
520
521impl<B: BufRead, R> BufReaderExt<B> for BufReader<R> {
522 fn lines_vec(self) -> LinesVec<Self>
523 where
524 Self: Sized,
525 {
526 LinesVec { buf: self }
527 }
528}
529
530impl<B: BufRead> Iterator for LinesVec<B> {
531 type Item = io::Result<Vec<u8>>;
532
533 fn next(&mut self) -> Option<std::io::Result<Vec<u8>>> {
534 let mut buf = Vec::new();
535 match self.buf.read_until(b'\n', &mut buf) {
536 Ok(0) => None,
537 Ok(_n) => Some(Ok(buf)),
538 Err(e) => Some(Err(e)),
539 }
540 }
541}
542
543impl From<CommandBuilder> for Command {
544 fn from(value: CommandBuilder) -> Self {
545 let mut command = Command::new(value.program.to_os_string());
546 command.args(value.args.to_vec());
547
548 if let Some(stdin) = value.stdin {
549 command.stdin(Stdio::from(stdin));
550 }
551
552 if let Some(stdout) = value.stdout {
553 command.stdout(Stdio::from(stdout));
554 }
555
556 if let Some(stderr) = value.stderr {
557 command.stderr(Stdio::from(stderr));
558 }
559 command
560 }
561}
562
563impl From<Cmd> for Command {
564 fn from(value: Cmd) -> Self {
565 let mut command = Command::new(value.program.to_os_string());
566 command.args(value.args.to_vec());
567
568 if let Some(stdin) = value.stdin {
569 command.stdin(Stdio::from(stdin));
570 }
571
572 if let Some(stdout) = value.stdout {
573 command.stdout(Stdio::from(stdout));
574 }
575
576 if let Some(stderr) = value.stderr {
577 command.stderr(Stdio::from(stderr));
578 }
579 command
580 }
581}