hermes_tokio_runtime_components/impls/os/
child_process.rs

1use core::time::Duration;
2use std::io::Error as IoError;
3use std::path::Path;
4use std::process::{ExitStatus, Stdio};
5
6use cgp::prelude::*;
7use hermes_async_runtime_components::task::types::future_task::FutureTask;
8use hermes_runtime_components::traits::fs::file_path::HasFilePathType;
9use hermes_runtime_components::traits::fs::read_file::CanReadFileAsString;
10use hermes_runtime_components::traits::os::child_process::{
11    ChildProcessStarter, ChildProcessWaiter, HasChildProcessType, ProvideChildProcessType,
12};
13use hermes_runtime_components::traits::sleep::CanSleep;
14use hermes_runtime_components::traits::spawn::CanSpawnTask;
15use tokio::fs::OpenOptions;
16use tokio::io::{copy, AsyncRead};
17use tokio::process::{Child, Command};
18
19pub struct ProvideTokioChildProcessType;
20
21impl<Runtime> ProvideChildProcessType<Runtime> for ProvideTokioChildProcessType
22where
23    Runtime: Async,
24{
25    type ChildProcess = Child;
26}
27pub struct StartTokioChildProcess;
28
29pub struct PrematureChildProcessExitError {
30    pub exit_status: ExitStatus,
31    pub stdout: String,
32    pub stderr: String,
33}
34
35impl<Runtime> ChildProcessStarter<Runtime> for StartTokioChildProcess
36where
37    Runtime: HasChildProcessType<ChildProcess = Child>
38        + HasFilePathType
39        + CanSleep
40        + CanPipeReaderToFile
41        + CanReadFileAsString
42        + CanRaiseError<IoError>
43        + CanRaiseError<PrematureChildProcessExitError>,
44    Runtime::FilePath: AsRef<Path>,
45{
46    async fn start_child_process(
47        runtime: &Runtime,
48        command_path: &Runtime::FilePath,
49        command_args: &[&str],
50        envs: &[(&str, &str)],
51        stdout_path: Option<&Runtime::FilePath>,
52        stderr_path: Option<&Runtime::FilePath>,
53    ) -> Result<Runtime::ChildProcess, Runtime::Error> {
54        let mut child_process = Command::new(command_path.as_ref())
55            .args(command_args)
56            .envs(Vec::from(envs))
57            .stdin(Stdio::null())
58            .stdout(Stdio::piped())
59            .stderr(Stdio::piped())
60            .kill_on_drop(true)
61            .spawn()
62            .map_err(Runtime::raise_error)?;
63
64        if let Some(stdout_path) = stdout_path {
65            if let Some(stdout) = child_process.stdout.take() {
66                runtime.pipe_reader_to_file(stdout, stdout_path).await?;
67            }
68        }
69
70        if let Some(stderr_path) = stderr_path {
71            if let Some(stderr) = child_process.stderr.take() {
72                runtime.pipe_reader_to_file(stderr, stderr_path).await?;
73            }
74        }
75
76        // Wait for a while and check if the child process exited immediately.
77        // If so, return error since we expect the child process to be running in the background.
78
79        runtime.sleep(Duration::from_secs(1)).await;
80
81        let status = child_process.try_wait().map_err(Runtime::raise_error)?;
82
83        match status {
84            None => Ok(child_process),
85            Some(exit_status) => {
86                let stderr = match stderr_path {
87                    None => String::new(),
88                    Some(stderr_path) => runtime.read_file_as_string(stderr_path).await?,
89                };
90
91                let stdout = match stdout_path {
92                    None => String::new(),
93                    Some(stdout_path) => runtime.read_file_as_string(stdout_path).await?,
94                };
95
96                Err(Runtime::raise_error(PrematureChildProcessExitError {
97                    exit_status,
98                    stdout,
99                    stderr,
100                }))
101            }
102        }
103    }
104}
105
106pub struct WaitChildProcess;
107
108impl<Runtime> ChildProcessWaiter<Runtime> for WaitChildProcess
109where
110    Runtime: HasChildProcessType<ChildProcess = Child>
111        + CanRaiseError<IoError>
112        + CanRaiseError<ExitStatus>,
113{
114    async fn wait_child_process(
115        mut child_process: Runtime::ChildProcess,
116    ) -> Result<(), Runtime::Error> {
117        let status = child_process.wait().await.map_err(Runtime::raise_error)?;
118
119        if status.success() {
120            Ok(())
121        } else {
122            Err(Runtime::raise_error(status))
123        }
124    }
125}
126
127#[async_trait]
128pub trait CanPipeReaderToFile: HasFilePathType + HasErrorType {
129    async fn pipe_reader_to_file(
130        &self,
131        reader: impl AsyncRead + Unpin + Send + Sync + 'static,
132        write_file: &Self::FilePath,
133    ) -> Result<(), Self::Error>;
134}
135
136impl<Runtime> CanPipeReaderToFile for Runtime
137where
138    Runtime: HasFilePathType + CanSpawnTask + CanRaiseError<IoError>,
139    Runtime::FilePath: AsRef<Path>,
140{
141    async fn pipe_reader_to_file(
142        &self,
143        mut reader: impl AsyncRead + Unpin + Send + Sync + 'static,
144        file_path: &Self::FilePath,
145    ) -> Result<(), Self::Error> {
146        let mut file = OpenOptions::new()
147            .append(true)
148            .create(true)
149            .open(file_path)
150            .await
151            .map_err(Runtime::raise_error)?;
152
153        self.spawn_task(FutureTask::new(async move {
154            let _ = copy(&mut reader, &mut file).await;
155        }));
156
157        Ok(())
158    }
159}