1mod errors;
2
3use std::collections::HashMap;
4use std::ffi::OsStr;
5use std::io::{BufRead, BufReader, Read};
6use std::path::Path;
7use std::process::{Child, Command, Stdio};
8use std::sync::mpsc::{channel, Sender};
9use std::time::{Duration, Instant};
10
11#[cfg(unix)]
12use std::os::unix::process::ExitStatusExt;
13
14pub use errors::Error;
15
16pub struct Process<P, Q>
17where
18 P: AsRef<Path>,
19 Q: AsRef<Path>,
20{
21 binary_path: P,
22 working_directory: Q,
23 envs: HashMap<String, String>,
24 timeout: Duration,
25}
26
27impl<P, Q> Process<P, Q>
28where
29 P: AsRef<Path>,
30 Q: AsRef<Path>,
31{
32 pub fn new(binary_path: P, working_directory: Q, envs: HashMap<String, String>, timeout: Duration) -> Self {
33 Self {
34 binary_path,
35 working_directory,
36 envs,
37 timeout,
38 }
39 }
40
41 pub fn spawn<I, S>(&self, args: I) -> Result<ProcessContext, Error>
42 where
43 I: IntoIterator<Item = S>,
44 S: AsRef<OsStr>,
45 {
46 let mut command = Command::new(self.binary_path.as_ref());
47 let command = command
48 .current_dir(self.working_directory.as_ref())
49 .stdout(Stdio::piped())
50 .stderr(Stdio::piped())
51 .args(args)
52 .envs(&self.envs);
53
54 let context = ProcessContext::new(command, self.timeout)?;
55
56 Ok(context)
57 }
58}
59
60pub struct ProcessContext {
61 child: Child,
62 start: Instant,
63 timeout: Duration,
64
65 pub stdout: Vec<String>,
66 pub stderr: Vec<String>,
67 pub exit_code: Option<i32>,
68 #[cfg(unix)]
69 pub signal_code: Option<i32>,
70}
71
72impl ProcessContext {
73 pub fn new(command: &mut Command, timeout: Duration) -> Result<Self, Error> {
74 let start = Instant::now();
75
76 Ok(Self {
77 child: command.spawn()?,
78 start,
79 timeout,
80 stdout: Vec::new(),
81 stderr: Vec::new(),
82 exit_code: None,
83 #[cfg(unix)]
84 signal_code: None,
85 })
86 }
87
88 pub fn wait<'a, P, Q>(mut self, mut stdout: P, mut stderr: Q) -> Result<Self, Error>
89 where
90 P: 'a + FnMut(Option<String>),
91 Q: 'a + FnMut(Option<String>),
92 {
93 let (stdout_tx, stdout_rx) = channel();
94 let stdout_processor = StreamProcessor::new(self.child.stdout.take(), stdout_tx);
95
96 let stdout_reader = std::thread::spawn(|| {
97 stdout_processor.stream();
98 });
99
100 let (stderr_tx, stderr_rx) = channel();
101 let stderr_processor = StreamProcessor::new(self.child.stderr.take(), stderr_tx);
102
103 let stderr_reader = std::thread::spawn(|| {
104 stderr_processor.stream();
105 });
106
107 loop {
108 match self.child.try_wait() {
109 Err(_) => {
110 let _ = self.child.kill().map(|_| self.child.wait());
111 let _ = stdout_reader.join();
112 let _ = stderr_reader.join();
113
114 return Err(Error::TimeoutError);
115 }
116 Ok(Some(status)) => {
117 self.exit_code = status.code();
118
119 if cfg!(unix) {
120 self.signal_code = status.signal();
121 }
122
123 let _ = stdout_reader.join();
124 let _ = stderr_reader.join();
125 return Ok(self);
126 }
127 Ok(None) => {
128 if self.start.elapsed().as_secs() < self.timeout.as_secs() {
129 std::thread::sleep(std::time::Duration::from_millis(20));
130
131 while let Ok(line) = stdout_rx.try_recv() {
132 if let Ok(line) = line {
133 stdout(Some(line.clone()));
134 self.stdout.push(line);
135 } else {
136 stdout(None);
137 self.stdout.push(String::from("<error retrieving stream content>"));
138 }
139 }
140
141 while let Ok(line) = stderr_rx.try_recv() {
142 if let Ok(line) = line {
143 stderr(Some(line.clone()));
144 self.stderr.push(line);
145 } else {
146 stderr(None);
147 self.stderr.push(String::from("<error retrieving stream content>"));
148 }
149 }
150
151 continue;
152 }
153
154 let _ = self.child.kill().map(|_| self.child.wait());
155 let _ = stdout_reader.join();
156 let _ = stderr_reader.join();
157 return Err(Error::TimeoutError);
158 }
159 };
160 }
161 }
162}
163
164pub struct StreamProcessor<T>
165where
166 T: Read,
167{
168 source: Option<T>,
169 sender: Sender<Result<String, Error>>,
170}
171
172impl<T> StreamProcessor<T>
173where
174 T: Read,
175{
176 pub fn new(source: Option<T>, sender: Sender<Result<String, Error>>) -> Self {
177 Self { source, sender }
178 }
179
180 fn stream(self) {
181 if let Some(source) = self.source {
182 for line in BufReader::new(source).lines().enumerate() {
183 let (_, line) = line;
184 let _ = self.sender.send(line.map_err(|e| Error::IOError(e.to_string())));
185 }
186 }
187 }
188}