nails_fork/
lib.rs

1use std::io;
2use std::process::Stdio;
3use std::sync::Arc;
4
5use futures::{stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
6use tokio::process::Command;
7use tokio::sync::Notify;
8
9use nails::execution::{
10    self, child_channel, sink_for, stream_for, ChildInput, ChildOutput, ExitCode,
11};
12use nails::{server, Nail};
13
14/// A Nail implementation that forks processes.
15#[derive(Clone)]
16pub struct ForkNail;
17
18impl Nail for ForkNail {
19    fn spawn(&self, cmd: execution::Command) -> Result<server::Child, io::Error> {
20        let mut child = Command::new(cmd.command.clone())
21            .args(cmd.args)
22            .env_clear()
23            .envs(cmd.env)
24            .current_dir(cmd.working_dir)
25            .kill_on_drop(true)
26            .stdout(Stdio::piped())
27            .stderr(Stdio::piped())
28            .stdin(Stdio::piped())
29            .spawn()?;
30
31        // Copy inputs to the child.
32        let (stdin_write, stdin_read) = child_channel::<ChildInput>();
33        let stdin = child.stdin.take().unwrap();
34        tokio::spawn(async move {
35            let mut input_stream = stdin_read.map(|child_input| match child_input {
36                ChildInput::Stdin(bytes) => Ok(bytes),
37            });
38            sink_for(stdin)
39                .send_all(&mut input_stream)
40                .map(|_| ())
41                .await;
42        });
43
44        // Fully consume the stdout/stderr streams before waiting on the exit stream.
45        let stdout_stream = stream_for(child.stdout.take().unwrap()).map_ok(ChildOutput::Stdout);
46        let stderr_stream = stream_for(child.stderr.take().unwrap()).map_ok(ChildOutput::Stderr);
47        let output_stream = stream::select(stdout_stream, stderr_stream).boxed();
48
49        let killed = Arc::new(Notify::new());
50        let killed2 = killed.clone();
51
52        let shutdown = async move {
53            killed2.notify_waiters();
54        };
55
56        let exit_code = async move {
57            tokio::select! {
58              res = child.wait() => {
59                res
60              }
61              _ = killed.notified() => {
62                // Kill the child process, and then await it to avoid zombies.
63                child.kill().await?;
64                child.wait().await
65              }
66            }
67        };
68
69        Ok(server::Child::new(
70            output_stream,
71            Some(stdin_write),
72            exit_code
73                .map(|res| match res {
74                    Ok(exit_status) => ExitCode(exit_status.code().unwrap_or(-1)),
75                    Err(_) => ExitCode(-1),
76                })
77                .boxed(),
78            Some(shutdown.boxed()),
79        ))
80    }
81}