1use log::error;
2use std::io;
3use std::pin::Pin;
4use std::process::Output;
5use std::sync::Arc;
6use tokio::{process::Child, runtime::Handle, sync::Mutex, task};
7
8#[derive(Copy, Clone, Debug)]
9pub struct ExitStatus {
10 pub id: u32,
11 pub is_success: bool,
12 pub exit_code: Option<i32>,
13}
14
15#[derive(Debug)]
16pub struct LocalProc {
17 id: u32,
18 inner: Child,
19 exit_status: Option<ExitStatus>,
20
21 supports_stdin: bool,
22 supports_stdout: bool,
23 supports_stderr: bool,
24
25 io_handle: Option<task::JoinHandle<()>>,
27
28 stdout_buf: Arc<Mutex<Vec<u8>>>,
30
31 stderr_buf: Arc<Mutex<Vec<u8>>>,
33}
34
35impl LocalProc {
36 pub fn new(child: Child) -> Self {
37 Self {
38 id: child.id(),
39 exit_status: None,
40 supports_stdin: child.stdin.is_some(),
41 supports_stdout: child.stdout.is_some(),
42 supports_stderr: child.stderr.is_some(),
43 inner: child,
44 io_handle: None,
45 stdout_buf: Arc::new(Mutex::new(Vec::new())),
46 stderr_buf: Arc::new(Mutex::new(Vec::new())),
47 }
48 }
49
50 pub fn id(&self) -> u32 {
51 self.id
52 }
53
54 pub fn inner(&self) -> &Child {
55 &self.inner
56 }
57
58 pub async fn exit_status(&mut self) -> Option<ExitStatus> {
59 use futures::future::{poll_fn, Future};
60 use std::task::Poll;
61
62 match self.exit_status {
63 None => {
64 let exit_status =
65 poll_fn(|ctx| match Pin::new(&mut self.inner).poll(ctx) {
66 Poll::Ready(res) => Poll::Ready(Some(res)),
67 Poll::Pending => Poll::Ready(None),
68 })
69 .await;
70
71 if let Some(status) = exit_status {
72 self.exit_status = Some(ExitStatus {
73 id: self.id,
74 is_success: status.is_ok(),
75 exit_code: status.ok().and_then(|s| s.code()),
76 });
77 }
78
79 self.exit_status
80 }
81 x => x,
82 }
83 }
84
85 pub fn spawn(mut self) -> Self {
88 if self.io_handle.is_some() {
90 return self;
91 }
92
93 let handle = Handle::current();
94
95 let stdout = self.inner.stdout.take();
96 let stderr = self.inner.stderr.take();
97
98 let stdout_buf = Arc::clone(&self.stdout_buf);
99 let stderr_buf = Arc::clone(&self.stderr_buf);
100
101 let io_handle = handle.spawn(async move {
102 let _ = tokio::join!(
103 async {
104 use tokio::io::AsyncReadExt;
105
106 if let Some(mut stdout) = stdout {
107 let mut buf = [0; 1024];
108
109 loop {
110 match stdout.read(&mut buf).await {
111 Ok(size) if size > 0 => {
112 stdout_buf
113 .lock()
114 .await
115 .extend_from_slice(&buf[..size]);
116 }
117 Ok(_) => break,
118 Err(x) => {
119 error!("stdout reader died: {}", x);
120 break;
121 }
122 }
123 }
124 }
125 },
126 async {
127 use tokio::io::AsyncReadExt;
128
129 if let Some(mut stderr) = stderr {
130 let mut buf = [0; 1024];
131
132 loop {
133 match stderr.read(&mut buf).await {
134 Ok(size) if size > 0 => {
135 stderr_buf
136 .lock()
137 .await
138 .extend_from_slice(&buf[..size]);
139 }
140 Ok(_) => break,
141 Err(x) => {
142 error!("stderr reader died: {}", x);
143 break;
144 }
145 }
146 }
147 }
148 }
149 );
150 });
151
152 self.io_handle = Some(io_handle);
153
154 self
155 }
156
157 pub async fn write_stdin(&mut self, buf: &[u8]) -> io::Result<()> {
158 use tokio::io::AsyncWriteExt;
159
160 match self.inner.stdin.as_mut() {
161 Some(stdin) => {
162 let mut result = stdin.write_all(buf).await;
163 if result.is_ok() {
164 result = stdin.flush().await;
165 }
166 result
167 }
168 None => Err(io::Error::from(io::ErrorKind::BrokenPipe)),
169 }
170 }
171
172 pub async fn read_stdout(&mut self) -> io::Result<Vec<u8>> {
173 if self.supports_stdout {
174 Ok(self.stdout_buf.lock().await.drain(..).collect())
175 } else {
176 Err(io::Error::from(io::ErrorKind::BrokenPipe))
177 }
178 }
179
180 pub async fn read_stderr(&mut self) -> io::Result<Vec<u8>> {
181 if self.supports_stderr {
182 Ok(self.stderr_buf.lock().await.drain(..).collect())
183 } else {
184 Err(io::Error::from(io::ErrorKind::BrokenPipe))
185 }
186 }
187
188 pub fn kill(&mut self) -> io::Result<()> {
189 self.inner.kill()
190 }
191
192 pub async fn kill_and_wait(mut self) -> io::Result<Output> {
193 self.kill()?;
194 self.inner.wait_with_output().await
195 }
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201 use std::process::Stdio;
202 use std::time::Duration;
203 use tokio::process::Command;
204 use tokio::time::{delay_for, timeout};
205 use tokio::{fs, io};
206
207 #[tokio::test]
208 async fn test_id_should_return_child_id() {
209 let child = Command::new("cat")
210 .stdin(Stdio::null())
211 .stdout(Stdio::null())
212 .stderr(Stdio::null())
213 .spawn()
214 .unwrap();
215
216 let id = child.id();
217 let local_proc = LocalProc::new(child);
218 assert_eq!(id, local_proc.id());
219 }
220
221 #[tokio::test]
222 async fn test_write_stdin_should_return_an_error_if_not_piped() {
223 let child = Command::new("cat")
224 .stdin(Stdio::null())
225 .stdout(Stdio::null())
226 .stderr(Stdio::null())
227 .spawn()
228 .unwrap();
229
230 let mut local_proc = LocalProc::new(child);
231 match local_proc.write_stdin(&[1, 2, 3]).await {
232 Ok(_) => panic!("Successfully wrote to stdin when not piped"),
233 Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
234 }
235 }
236
237 #[tokio::test]
238 async fn test_write_stdin_should_write_contents_to_process() {
239 let f = tempfile::tempfile().unwrap();
240 let child = Command::new("cat")
241 .stdin(Stdio::piped())
242 .stdout(f.try_clone().unwrap())
243 .stderr(Stdio::null())
244 .spawn()
245 .unwrap();
246
247 let mut local_proc = LocalProc::new(child);
248 match local_proc.write_stdin(b"test").await {
249 Ok(_) => {
250 match timeout(Duration::from_millis(10), async {
251 use std::io::SeekFrom;
252 use tokio::io::AsyncReadExt;
253 let mut f = fs::File::from_std(f);
254
255 loop {
256 let mut s = String::new();
257 f.seek(SeekFrom::Start(0)).await.unwrap();
258 f.read_to_string(&mut s).await.unwrap();
259 if !s.is_empty() {
260 break s;
261 }
262
263 task::yield_now().await;
264 }
265 })
266 .await
267 {
268 Ok(s) => assert_eq!(s, "test", "Unexpected output"),
269 Err(x) => panic!("Failed to write to file: {}", x),
270 }
271 }
272 Err(_) => panic!("Failed to write to stdin"),
273 }
274 }
275
276 #[tokio::test]
277 async fn test_read_stdout_should_return_an_error_if_not_piped() {
278 let child = Command::new("echo")
279 .arg("test")
280 .stdin(Stdio::null())
281 .stdout(Stdio::null())
282 .stderr(Stdio::null())
283 .spawn()
284 .unwrap();
285
286 let mut local_proc = LocalProc::new(child);
287 match local_proc.read_stdout().await {
288 Ok(_) => {
289 panic!("Unexpectedly succeeded in reading stdout not piped")
290 }
291 Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
292 }
293 }
294
295 #[tokio::test]
296 async fn test_read_stdout_should_return_empty_content_if_none_available() {
297 let child = Command::new("echo")
298 .arg("test")
299 .stdin(Stdio::null())
300 .stdout(Stdio::piped())
301 .stderr(Stdio::null())
302 .spawn()
303 .unwrap();
304
305 let mut local_proc = LocalProc::new(child);
307
308 match local_proc.read_stdout().await {
309 Ok(buf) => assert!(buf.is_empty()),
310 Err(x) => panic!("Unexpected error: {}", x),
311 }
312 }
313
314 #[tokio::test]
315 async fn test_read_stdout_should_not_return_content_returned_previously() {
316 let child = Command::new("echo")
317 .arg("test")
318 .stdin(Stdio::null())
319 .stdout(Stdio::piped())
320 .stderr(Stdio::null())
321 .spawn()
322 .unwrap();
323
324 let mut local_proc = LocalProc::new(child).spawn();
325
326 assert!(
328 !timeout(Duration::from_millis(10), async {
329 loop {
330 match local_proc.read_stdout().await {
331 Ok(buf) => {
332 if !buf.is_empty() {
333 break buf;
334 }
335
336 task::yield_now().await;
340 }
341 Err(x) => panic!("Unexpected error: {}", x),
342 }
343 }
344 })
345 .await
346 .unwrap()
347 .is_empty(),
348 "Failed to get first batch of content"
349 );
350
351 assert!(
353 local_proc.read_stdout().await.unwrap().is_empty(),
354 "Unexpectedly got content when nothing should be left"
355 );
356 }
357
358 #[tokio::test]
359 async fn test_read_stdout_should_return_content_if_available() {
360 let child = Command::new("echo")
361 .arg("test")
362 .stdin(Stdio::null())
363 .stdout(Stdio::piped())
364 .stderr(Stdio::null())
365 .spawn()
366 .unwrap();
367
368 let mut local_proc = LocalProc::new(child).spawn();
369
370 let buf = timeout(Duration::from_millis(10), async {
371 loop {
372 match local_proc.read_stdout().await {
373 Ok(buf) => {
374 if !buf.is_empty() {
375 break buf;
376 }
377
378 task::yield_now().await;
382 }
383 Err(x) => panic!("Unexpected error: {}", x),
384 }
385 }
386 })
387 .await
388 .unwrap();
389
390 assert_eq!(buf, b"test\n");
391 }
392
393 #[tokio::test]
394 async fn test_read_stderr_should_return_an_error_if_not_piped() {
395 let child = Command::new("rev")
396 .arg("--aaa")
397 .stdin(Stdio::null())
398 .stdout(Stdio::null())
399 .stderr(Stdio::null())
400 .spawn()
401 .unwrap();
402
403 let mut local_proc = LocalProc::new(child);
404 match local_proc.read_stderr().await {
405 Ok(_) => {
406 panic!("Unexpectedly succeeded in reading stderr not piped")
407 }
408 Err(x) => assert_eq!(x.kind(), io::ErrorKind::BrokenPipe),
409 }
410 }
411
412 #[tokio::test]
413 async fn test_read_stderr_should_return_empty_content_if_none_available() {
414 let child = Command::new("rev")
415 .arg("--aaa")
416 .stdin(Stdio::null())
417 .stdout(Stdio::null())
418 .stderr(Stdio::piped())
419 .spawn()
420 .unwrap();
421
422 let mut local_proc = LocalProc::new(child);
424
425 match local_proc.read_stderr().await {
426 Ok(buf) => assert!(buf.is_empty()),
427 Err(x) => panic!("Unexpected error: {}", x),
428 }
429 }
430
431 #[tokio::test]
432 async fn test_read_stderr_should_not_return_content_returned_previously() {
433 let child = Command::new("rev")
434 .arg("--aaa")
435 .stdin(Stdio::null())
436 .stdout(Stdio::null())
437 .stderr(Stdio::piped())
438 .spawn()
439 .unwrap();
440
441 let mut local_proc = LocalProc::new(child).spawn();
442
443 assert!(
445 !timeout(Duration::from_millis(10), async {
446 loop {
447 match local_proc.read_stderr().await {
448 Ok(buf) => {
449 if !buf.is_empty() {
450 break buf;
451 }
452
453 task::yield_now().await;
457 }
458 Err(x) => panic!("Unexpected error: {}", x),
459 }
460 }
461 })
462 .await
463 .unwrap()
464 .is_empty(),
465 "Failed to get first batch of content"
466 );
467
468 assert!(
470 local_proc.read_stderr().await.unwrap().is_empty(),
471 "Unexpectedly got content when nothing should be left"
472 );
473 }
474
475 #[tokio::test]
476 async fn test_read_stderr_should_return_content_if_available() {
477 let child = Command::new("rev")
478 .arg("--aaa")
479 .stdin(Stdio::null())
480 .stdout(Stdio::null())
481 .stderr(Stdio::piped())
482 .spawn()
483 .unwrap();
484
485 let mut local_proc = LocalProc::new(child).spawn();
486
487 let buf = timeout(Duration::from_millis(10), async {
488 loop {
489 match local_proc.read_stderr().await {
490 Ok(buf) => {
491 if !buf.is_empty() {
492 break buf;
493 }
494
495 task::yield_now().await;
499 }
500 Err(x) => panic!("Unexpected error: {}", x),
501 }
502 }
503 })
504 .await
505 .unwrap();
506
507 assert!(buf.len() > 0);
508 }
509
510 #[tokio::test]
511 async fn test_exit_status_should_return_none_if_not_exited() {
512 let child = Command::new("sleep")
513 .arg("60")
514 .stdin(Stdio::null())
515 .stdout(Stdio::null())
516 .stderr(Stdio::null())
517 .spawn()
518 .unwrap();
519 let mut local_proc = LocalProc::new(child).spawn();
520 match local_proc.exit_status().await {
521 None => (),
522 Some(x) => panic!("Unexpected content: {:?}", x),
523 }
524 }
525
526 #[tokio::test]
527 async fn test_exit_status_should_return_some_status_if_exited() {
528 let child = Command::new("echo")
529 .stdin(Stdio::null())
530 .stdout(Stdio::null())
531 .stderr(Stdio::null())
532 .spawn()
533 .unwrap();
534
535 let id = child.id();
536 let mut local_proc = LocalProc::new(child).spawn();
537
538 delay_for(Duration::from_millis(10)).await;
540
541 match local_proc.exit_status().await {
542 Some(status) => assert_eq!(status.id, id),
543 None => panic!("Unexpectedly got no result"),
544 }
545 }
546
547 #[tokio::test]
548 async fn test_exit_status_should_support_being_called_multiple_times_after_exit(
549 ) {
550 let child = Command::new("echo")
551 .stdin(Stdio::null())
552 .stdout(Stdio::null())
553 .stderr(Stdio::null())
554 .spawn()
555 .unwrap();
556
557 let mut local_proc = LocalProc::new(child).spawn();
558
559 delay_for(Duration::from_millis(10)).await;
561
562 assert!(local_proc.exit_status().await.is_some());
563 assert!(local_proc.exit_status().await.is_some());
564 }
565
566 #[tokio::test]
567 async fn kill_should_send_kill_request_to_process_without_waiting() {
568 let child = Command::new("sleep")
569 .arg("60")
570 .stdin(Stdio::null())
571 .stdout(Stdio::null())
572 .stderr(Stdio::null())
573 .spawn()
574 .unwrap();
575
576 let mut local_proc = LocalProc::new(child).spawn();
577 match local_proc.kill() {
578 Ok(_) => (),
579 Err(x) => panic!("Unexpected error: {}", x),
580 }
581 }
582
583 #[tokio::test]
584 async fn test_kill_and_wait_should_kill_and_return_process_result() {
585 let child = Command::new("sleep")
586 .arg("60")
587 .stdin(Stdio::null())
588 .stdout(Stdio::null())
589 .stderr(Stdio::null())
590 .spawn()
591 .unwrap();
592
593 let local_proc = LocalProc::new(child).spawn();
594 match local_proc.kill_and_wait().await {
595 Ok(_) => (),
596 Err(x) => panic!("Unexpected error: {}", x),
597 }
598 }
599}