simple_cmd/
impls.rs

1use std::ffi::{OsStr, OsString};
2use std::fmt::{Display, Formatter};
3use std::io;
4use std::io::{BufRead, BufReader, ErrorKind};
5use std::path::Path;
6use std::process::{ChildStderr, ChildStdout, Command, ExitStatus, Output, Stdio};
7use std::sync::{Arc, Condvar, Mutex};
8use std::time::Duration;
9
10use crossbeam::channel::Receiver;
11use crossbeam_channel::{tick, Select};
12use tracing::{error, trace, warn};
13
14use crate::debug::CommandDebug;
15use crate::errors::CmdError;
16use crate::{Cmd, CommandBuilder, Error, OutputResult, Vec8ToString};
17
18impl Display for Cmd {
19	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
20		write!(f, "{:?} {:?}", self.program, self.args)
21	}
22}
23
24impl Display for CommandBuilder {
25	fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
26		write!(
27			f,
28			"{:} {:}",
29			self.program.to_str().unwrap(),
30			self.args.join(OsStr::new(" ")).to_str().unwrap()
31		)
32	}
33}
34
35impl OutputResult for Output {
36	fn to_result(&self) -> crate::Result<Vec<u8>> {
37		if self.status.success() && self.stderr.is_empty() {
38			Ok(self.stdout.to_owned())
39		} else {
40			Err(crate::Error::CommandError(CmdError::from_err(
41				self.status,
42				self.stdout.to_owned(),
43				self.stderr.to_owned(),
44			)))
45		}
46	}
47
48	fn try_to_result(&self) -> crate::Result<Vec<u8>> {
49		if self.status.code().is_none() && self.stderr.is_empty() {
50			Ok(self.stdout.to_owned())
51		} else {
52			Err(crate::Error::CommandError(CmdError::from_err(
53				self.status,
54				self.stdout.to_owned(),
55				self.stderr.to_owned(),
56			)))
57		}
58	}
59}
60
61impl CommandBuilder {
62	pub fn new<S: AsRef<OsStr>>(program: S) -> CommandBuilder {
63		CommandBuilder {
64			program: OsString::from(program.as_ref()),
65			timeout: None,
66			cwd: None,
67			debug: false,
68			args: vec![],
69			stdin: None,
70			stdout: Some(Stdio::piped()),
71			stderr: Some(Stdio::piped()),
72			signal: None,
73		}
74	}
75
76	pub fn with_debug(mut self, debug: bool) -> Self {
77		self.debug = debug;
78		self
79	}
80
81	pub fn with_timeout(mut self, duration: Duration) -> Self {
82		self.timeout = Some(duration);
83		self
84	}
85
86	pub fn timeout(mut self, duration: Option<Duration>) -> Self {
87		self.timeout = duration;
88		self
89	}
90
91	pub fn with_signal(mut self, signal: Receiver<()>) -> Self {
92		self.signal = Some(signal);
93		self
94	}
95
96	pub fn signal(mut self, signal: Option<Receiver<()>>) -> Self {
97		self.signal = signal;
98		self
99	}
100
101	pub fn arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
102		self.args.push(arg.as_ref().into());
103		self
104	}
105
106	pub fn with_arg<S: AsRef<OsStr>>(mut self, arg: S) -> Self {
107		self.args.push(arg.as_ref().into());
108		self
109	}
110
111	pub fn args<I, S>(mut self, args: I) -> Self
112	where
113		I: IntoIterator<Item = S>,
114		S: AsRef<OsStr>,
115	{
116		for arg in args {
117			self.args.push(arg.as_ref().into());
118		}
119		self
120	}
121
122	pub fn with_args<I, S>(mut self, args: I) -> Self
123	where
124		I: IntoIterator<Item = S>,
125		S: AsRef<OsStr>,
126	{
127		for arg in args {
128			self.args.push(arg.as_ref().into());
129		}
130		self
131	}
132
133	pub fn stdout<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
134		if let Some(cfg) = cfg {
135			self.stdout = Some(cfg.into());
136		} else {
137			self.stdout = None;
138		}
139		self
140	}
141
142	pub fn stderr<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
143		if let Some(cfg) = cfg {
144			self.stderr = Some(cfg.into());
145		} else {
146			self.stderr = None;
147		}
148		self
149	}
150
151	pub fn stdin<T: Into<Stdio>>(mut self, cfg: Option<T>) -> Self {
152		if let Some(cfg) = cfg {
153			self.stdin = Some(cfg.into());
154		} else {
155			self.stdin = None;
156		}
157		self
158	}
159
160	pub fn current_dir<P: AsRef<Path>>(mut self, dir: P) -> Self {
161		self.cwd = Some(dir.as_ref().into());
162		self
163	}
164
165	pub fn get_current_dir(&self) -> Option<&Path> {
166		self.cwd.as_ref().map(|cs| Path::new(cs))
167	}
168
169	pub fn build(mut self) -> Cmd {
170		return Cmd {
171			debug: self.debug,
172			program: self.program.to_owned(),
173			args: self.args.to_owned(),
174			stdin: self.stdin.take(),
175			stdout: self.stdout.take(),
176			stderr: self.stderr.take(),
177			timeout: self.timeout.take(),
178			signal: self.signal.take(),
179			cwd: self.cwd.take(),
180		};
181	}
182}
183
184impl Cmd {
185	// region public methods
186
187	pub fn builder<S: AsRef<OsStr>>(program: S) -> CommandBuilder {
188		CommandBuilder::new(program)
189	}
190
191	pub fn new<S: AsRef<OsStr>>(program: S) -> Self {
192		Cmd {
193			program: OsString::from(program.as_ref()),
194			cwd: None,
195			timeout: None,
196			debug: false,
197			args: vec![],
198			stdin: None,
199			stdout: None,
200			stderr: None,
201			signal: None,
202		}
203	}
204
205	pub fn command(mut self) -> Command {
206		let mut command = Command::new(self.program.to_os_string());
207		command.args(self.args.clone());
208
209		if let Some(stdin) = self.stdin.take() {
210			command.stdin(stdin);
211		}
212
213		if let Some(stdout) = self.stdout.take() {
214			command.stdout(stdout);
215		}
216
217		if let Some(stderr) = self.stderr.take() {
218			command.stderr(stderr);
219		}
220
221		if let Some(cwd) = self.cwd.take() {
222			command.current_dir(cwd);
223		}
224
225		command
226	}
227
228	// endregion public methods
229
230	pub fn run(mut self) -> crate::Result<Option<ExitStatus>> {
231		if self.debug {
232			self.debug();
233		}
234
235		let mut command = self.command();
236		let mut child = command.spawn().unwrap();
237		drop(command);
238		child.try_wait().map_err(|e| crate::Error::IoError(e))
239	}
240
241	pub fn output(self) -> crate::Result<Output> {
242		self.wait_for_output()
243	}
244
245	pub(crate) fn wait_for_output(mut self) -> crate::Result<Output> {
246		let has_debug = self.debug;
247		if has_debug {
248			self.debug();
249		}
250
251		let cancel_signal = self.signal.take();
252		let ticks = self.timeout.take().map(|t| tick(t));
253
254		let mut command = self.command();
255		let mut child = command.spawn().unwrap();
256
257		let stdout = child.stdout.take();
258		let stderr = child.stderr.take();
259
260		let status_receiver = Arc::new((Mutex::new(None), Condvar::new()));
261		let status_receiver_cloned = Arc::clone(&status_receiver);
262
263		drop(command);
264
265		let local_thread = std::thread::Builder::new().name("cmd_wait".to_string()).spawn(move || {
266			let (lock, condvar) = &*status_receiver_cloned;
267			let mut status_mutex = lock.lock().unwrap();
268
269			let mut sel = Select::new();
270			let mut oper_cancel: Option<usize> = None;
271			let mut oper_timeout: Option<usize> = None;
272
273			if cancel_signal.is_some() {
274				oper_cancel = Some(sel.recv(cancel_signal.as_ref().unwrap()));
275			}
276
277			if ticks.is_some() {
278				oper_timeout = Some(sel.recv(ticks.as_ref().unwrap()));
279			}
280
281			let mut killed = false;
282
283			loop {
284				match sel.try_ready() {
285					Err(_) => {
286						if let Ok(Some(status)) = child.try_wait() {
287							*status_mutex = Some(status);
288							condvar.notify_one();
289							break;
290						}
291					}
292
293					Ok(i) if !killed && oper_cancel.is_some() && i == oper_cancel.unwrap() => {
294						if has_debug {
295							warn!("ctrl+c received");
296						}
297						sel.remove(oper_cancel.unwrap());
298						let _ = child.kill();
299						killed = true;
300					}
301
302					Ok(i) if !killed && oper_timeout.is_some() && i == oper_timeout.unwrap() => {
303						if has_debug {
304							warn!("command timeout! killing the process...");
305						}
306						sel.remove(oper_timeout.unwrap());
307						let _ = child.kill();
308						killed = true;
309					}
310
311					Ok(i) => {
312						if has_debug {
313							warn!("Invalid operation index {i}!");
314						}
315						break;
316					}
317				}
318			}
319		})?;
320
321		// start collecting the stdout and stderr from the child process
322		let output = Cmd::read_to_end(stdout, stderr);
323
324		// wait for the local thread to complete
325		if let Err(_err) = local_thread.join() {
326			warn!("failed to join the thread!");
327		}
328
329		// Wait for the thread to complete.
330		let (lock, cvar) = &*status_receiver;
331		let mut status = lock.lock().unwrap();
332		while status.is_none() {
333			(status, _) = cvar.wait_timeout(status, Duration::from_secs(1)).unwrap();
334			break;
335			//status = cvar.wait(status).unwrap();
336		}
337
338		//trace!("final exit status is: {status:?}");
339
340		match output {
341			Ok(output) => Ok(Output {
342				status: status.unwrap(),
343				stdout: output.0,
344				stderr: output.1,
345			}),
346			Err(e) => Err(e),
347		}
348	}
349
350	pub fn read_to_end(stdout: Option<ChildStdout>, stderr: Option<ChildStderr>) -> crate::Result<(Vec<u8>, Vec<u8>)> {
351		let mut stdout_writer: Vec<u8> = Vec::new();
352		let mut stderr_writer: Vec<u8> = Vec::new();
353
354		if let Some(stdout) = stdout {
355			let stdout_reader = BufReader::new(stdout);
356			for line in <BufReader<ChildStdout> as BufReaderExt<BufReader<ChildStdout>>>::lines_vec(stdout_reader) {
357				stdout_writer.extend(line?);
358			}
359		}
360
361		if let Some(stderr) = stderr {
362			let stderr_reader = BufReader::new(stderr);
363			for line in <BufReader<ChildStderr> as BufReaderExt<BufReader<ChildStderr>>>::lines_vec(stderr_reader) {
364				stderr_writer.extend(line?);
365			}
366		}
367
368		Ok((stdout_writer, stderr_writer))
369	}
370
371	pub fn pipe<T>(mut self, cmd2: T) -> Result<Output, Error>
372	where
373		T: Into<Command>,
374	{
375		let mut other = cmd2.into();
376
377		if self.debug {
378			let s1 = self.as_string();
379			let s2 = other.as_string();
380			trace!("Executing `{s1} | {s2}`...");
381		}
382
383		let cancel_signal = self.signal.take();
384		let ticks = self.timeout.take().map(|t| tick(t));
385
386		let mut command1 = self.command();
387		let mut child1 = command1.spawn().unwrap();
388
389		let child1_stdout: ChildStdout = child1
390			.stdout
391			.take()
392			.ok_or(io::Error::new(ErrorKind::InvalidData, "child stdout unavailable"))?;
393
394		let fd: Stdio = child1_stdout.try_into().unwrap();
395
396		other.stdin(fd);
397
398		let mut child2 = other.spawn().unwrap();
399
400		let stdout = child2.stdout.take();
401		let stderr = child2.stderr.take();
402
403		let status_receiver = Arc::new((Mutex::new(None), Condvar::new()));
404		let status_receiver_cloned = Arc::clone(&status_receiver);
405
406		drop(command1);
407		drop(other);
408
409		let local_thread = std::thread::Builder::new().name("cmd_wait".to_string()).spawn(move || {
410			let (lock, condvar) = &*status_receiver_cloned;
411			let mut status_mutex = lock.lock().unwrap();
412
413			let mut sel = Select::new();
414			let mut oper_cancel: Option<usize> = None;
415			let mut oper_timeout: Option<usize> = None;
416
417			if cancel_signal.is_some() {
418				oper_cancel = Some(sel.recv(cancel_signal.as_ref().unwrap()));
419			}
420
421			if ticks.is_some() {
422				oper_timeout = Some(sel.recv(ticks.as_ref().unwrap()));
423			}
424
425			let mut killed = false;
426
427			loop {
428				match sel.try_ready() {
429					Err(_) => {
430						if let Ok(Some(status)) = child2.try_wait() {
431							//warn!("exit status received:/**/ {:?}", status);
432							let _ = child1.kill();
433							*status_mutex = Some(status);
434							condvar.notify_one();
435							break;
436						}
437
438						if !killed {
439							if let Ok(Some(_status1)) = child1.try_wait() {
440								//warn!("[1] exit status received: {:?}", status1);
441								if let Ok(Some(_status)) = child2.try_wait() {
442									//warn!("[2] exit status received: {:?}", _status);
443									killed = true;
444								} else {
445									//warn!("killing child2..");
446									//let _ = child2.kill();
447									killed = true;
448								}
449							}
450						}
451					}
452
453					Ok(i) if !killed && oper_cancel.is_some() && i == oper_cancel.unwrap() => {
454						sel.remove(oper_cancel.unwrap());
455						let _ = child1.kill();
456						let _ = child2.kill();
457						killed = true;
458					}
459
460					Ok(i) if !killed && oper_timeout.is_some() && i == oper_timeout.unwrap() => {
461						sel.remove(oper_timeout.unwrap());
462						let _ = child1.kill();
463						let _ = child2.kill();
464						killed = true;
465					}
466
467					Ok(i) => {
468						error!("Invalid operation index {i}!");
469						break;
470					}
471				}
472			}
473		})?;
474
475		// start collecting the stdout and stderr from the child process
476		let output = Cmd::read_to_end(stdout, stderr);
477
478		// wait for the local thread to complete
479		if let Err(_err) = local_thread.join() {
480			warn!("failed to join the thread!");
481		}
482
483		// Wait for the thread to complete.
484		let (lock, cvar) = &*status_receiver;
485		let mut status = lock.lock().unwrap();
486		while status.is_none() {
487			(status, _) = cvar.wait_timeout(status, Duration::from_secs(1)).unwrap();
488			break;
489		}
490
491		match output {
492			Ok(output) => Ok(Output {
493				status: status.unwrap(),
494				stdout: output.0,
495				stderr: output.1,
496			}),
497			Err(e) => Err(e),
498		}
499	}
500}
501
502impl Vec8ToString for Vec<u8> {
503	fn as_str(&self) -> Option<&str> {
504		match std::str::from_utf8(self) {
505			Ok(s) => Some(s),
506			Err(_) => None,
507		}
508	}
509}
510
511pub(crate) trait BufReaderExt<B: BufRead> {
512	fn lines_vec(self) -> LinesVec<Self>
513	where
514		Self: Sized;
515}
516
517pub struct LinesVec<B> {
518	buf: B,
519}
520
521impl<B: BufRead, R> BufReaderExt<B> for BufReader<R> {
522	fn lines_vec(self) -> LinesVec<Self>
523	where
524		Self: Sized,
525	{
526		LinesVec { buf: self }
527	}
528}
529
530impl<B: BufRead> Iterator for LinesVec<B> {
531	type Item = io::Result<Vec<u8>>;
532
533	fn next(&mut self) -> Option<std::io::Result<Vec<u8>>> {
534		let mut buf = Vec::new();
535		match self.buf.read_until(b'\n', &mut buf) {
536			Ok(0) => None,
537			Ok(_n) => Some(Ok(buf)),
538			Err(e) => Some(Err(e)),
539		}
540	}
541}
542
543impl From<CommandBuilder> for Command {
544	fn from(value: CommandBuilder) -> Self {
545		let mut command = Command::new(value.program.to_os_string());
546		command.args(value.args.to_vec());
547
548		if let Some(stdin) = value.stdin {
549			command.stdin(Stdio::from(stdin));
550		}
551
552		if let Some(stdout) = value.stdout {
553			command.stdout(Stdio::from(stdout));
554		}
555
556		if let Some(stderr) = value.stderr {
557			command.stderr(Stdio::from(stderr));
558		}
559		command
560	}
561}
562
563impl From<Cmd> for Command {
564	fn from(value: Cmd) -> Self {
565		let mut command = Command::new(value.program.to_os_string());
566		command.args(value.args.to_vec());
567
568		if let Some(stdin) = value.stdin {
569			command.stdin(Stdio::from(stdin));
570		}
571
572		if let Some(stdout) = value.stdout {
573			command.stdout(Stdio::from(stdout));
574		}
575
576		if let Some(stderr) = value.stderr {
577			command.stderr(Stdio::from(stderr));
578		}
579		command
580	}
581}