1use std::io;
2use std::process::Stdio;
3use std::sync::Arc;
4
5use futures::{stream, FutureExt, SinkExt, StreamExt, TryStreamExt};
6use tokio::process::Command;
7use tokio::sync::Notify;
8
9use nails::execution::{
10 self, child_channel, sink_for, stream_for, ChildInput, ChildOutput, ExitCode,
11};
12use nails::{server, Nail};
13
14#[derive(Clone)]
16pub struct ForkNail;
17
18impl Nail for ForkNail {
19 fn spawn(&self, cmd: execution::Command) -> Result<server::Child, io::Error> {
20 let mut child = Command::new(cmd.command.clone())
21 .args(cmd.args)
22 .env_clear()
23 .envs(cmd.env)
24 .current_dir(cmd.working_dir)
25 .kill_on_drop(true)
26 .stdout(Stdio::piped())
27 .stderr(Stdio::piped())
28 .stdin(Stdio::piped())
29 .spawn()?;
30
31 let (stdin_write, stdin_read) = child_channel::<ChildInput>();
33 let stdin = child.stdin.take().unwrap();
34 tokio::spawn(async move {
35 let mut input_stream = stdin_read.map(|child_input| match child_input {
36 ChildInput::Stdin(bytes) => Ok(bytes),
37 });
38 sink_for(stdin)
39 .send_all(&mut input_stream)
40 .map(|_| ())
41 .await;
42 });
43
44 let stdout_stream = stream_for(child.stdout.take().unwrap()).map_ok(ChildOutput::Stdout);
46 let stderr_stream = stream_for(child.stderr.take().unwrap()).map_ok(ChildOutput::Stderr);
47 let output_stream = stream::select(stdout_stream, stderr_stream).boxed();
48
49 let killed = Arc::new(Notify::new());
50 let killed2 = killed.clone();
51
52 let shutdown = async move {
53 killed2.notify_waiters();
54 };
55
56 let exit_code = async move {
57 tokio::select! {
58 res = child.wait() => {
59 res
60 }
61 _ = killed.notified() => {
62 child.kill().await?;
64 child.wait().await
65 }
66 }
67 };
68
69 Ok(server::Child::new(
70 output_stream,
71 Some(stdin_write),
72 exit_code
73 .map(|res| match res {
74 Ok(exit_status) => ExitCode(exit_status.code().unwrap_or(-1)),
75 Err(_) => ExitCode(-1),
76 })
77 .boxed(),
78 Some(shutdown.boxed()),
79 ))
80 }
81}