proc_heim/process/
spawner.rs

1use std::{
2    fs::{self, File},
3    io,
4    path::{Path, PathBuf},
5    process::Stdio,
6};
7
8use nix::{
9    errno::Errno,
10    sys::stat::{self},
11    unistd,
12};
13use tokio::{net::unix, process::Command};
14
15use crate::working_dir::WorkingDir;
16
17use super::{
18    log_reader::LogReader,
19    model::{MessagingType, Process, ProcessBuilder},
20    reader::MessageReader,
21    writer::MessageWriter,
22    Cmd, LoggingType, ProcessId, Runnable,
23};
24
25/// Environment variable representing the directory intended for storing temporary process data.
26///
27/// The directory will be deleted when the process is killed manually.
28pub const PROCESS_DATA_DIR_ENV_NAME: &str = "PROCESS_DATA_DIR";
29/// Environment variable representing a named pipe path used to read incoming messages in the child process.
30///
31/// See [`ProcessManagerHandle::subscribe_message_stream`](crate::manager::ProcessManagerHandle::subscribe_message_stream) example.
32pub const INPUT_PIPE_ENV_NAME: &str = "INPUT_PIPE";
33/// Environment variable representing a named pipe path used to send messages to the parent process.
34///
35/// See [`ProcessManagerHandle::subscribe_message_stream`](crate::manager::ProcessManagerHandle::subscribe_message_stream) example.
36pub const OUTPUT_PIPE_ENV_NAME: &str = "OUTPUT_PIPE";
37
38pub struct ProcessSpawner {
39    working_dir: WorkingDir,
40}
41
42impl ProcessSpawner {
43    pub fn new(working_dir: WorkingDir) -> Self {
44        Self { working_dir }
45    }
46
47    pub fn spawn_runnable(
48        &self,
49        id: &ProcessId,
50        runnable: Box<dyn Runnable>,
51    ) -> Result<Process, SpawnerError> {
52        let process_dir = self.working_dir.process_dir(id);
53        fs::create_dir(&process_dir).map_err(SpawnerError::CannotCreateProcessWorkingDir)?;
54        fs::create_dir(self.working_dir.process_data_dir(id))?;
55        match self.try_spawn_runnable(id, &*runnable, &process_dir) {
56            Ok(process) => Ok(process),
57            Err(err) => {
58                let _ = runnable.clean_after_fail(&process_dir);
59                let _ = fs::remove_dir_all(&process_dir);
60                Err(err)
61            }
62        }
63    }
64
65    fn try_spawn_runnable(
66        &self,
67        id: &ProcessId,
68        runnable: &dyn Runnable,
69        process_dir: &Path,
70    ) -> Result<Process, SpawnerError> {
71        let cmd = runnable
72            .bootstrap_cmd(process_dir)
73            .map_err(SpawnerError::BootstrapProcessFailed)?;
74        self.spawn(id, cmd)
75    }
76
77    fn spawn(&self, id: &ProcessId, cmd: Cmd) -> Result<Process, SpawnerError> {
78        let mut child = Command::new(cmd.cmd);
79        if !cmd.args.is_empty() {
80            child.args(cmd.args);
81        }
82        child.kill_on_drop(true);
83
84        if let Some(current_dir) = cmd.options.current_dir {
85            child.current_dir(current_dir);
86        }
87
88        if cmd.options.clear_envs {
89            child.env_clear();
90        }
91
92        for env in cmd.options.envs_to_remove {
93            child.env_remove(env);
94        }
95
96        if !cmd.options.envs.is_empty() {
97            child.envs(cmd.options.envs);
98        }
99
100        let mut process_builder = ProcessBuilder::default();
101
102        child.env(
103            PROCESS_DATA_DIR_ENV_NAME,
104            self.working_dir.process_data_dir(id),
105        );
106
107        child.stdin(Stdio::null());
108        child.stdout(Stdio::null());
109        child.stderr(Stdio::null());
110
111        if let Some(messaging_type) = cmd.options.message_output {
112            let receiver = match messaging_type {
113                MessagingType::StandardIo => {
114                    let (sender, receiver) = unix::pipe::pipe().unwrap();
115                    child.stdout(sender.into_nonblocking_fd()?);
116                    receiver
117                }
118                MessagingType::NamedPipe => {
119                    let output_pipe = self.working_dir.message_reader_pipe(id);
120                    Self::create_named_pipe(&output_pipe)?;
121                    child.env(OUTPUT_PIPE_ENV_NAME, output_pipe.clone());
122                    unix::pipe::OpenOptions::new()
123                        .read_write(true)
124                        .open_receiver(output_pipe)?
125                }
126            };
127            let message_reader = MessageReader::spawn(receiver, cmd.options.output_buffer_capacity);
128            process_builder.message_reader = message_reader.into();
129        }
130
131        if let Some(messaging_type) = cmd.options.message_input {
132            let sender = match messaging_type {
133                MessagingType::StandardIo => {
134                    let (sender, receiver) = unix::pipe::pipe().unwrap();
135                    child.stdin(receiver.into_nonblocking_fd()?);
136                    sender
137                }
138                MessagingType::NamedPipe => {
139                    let input_pipe = self.working_dir.message_writer_pipe(id);
140                    Self::create_named_pipe(&input_pipe)?;
141                    child.env(INPUT_PIPE_ENV_NAME, input_pipe.clone());
142                    unix::pipe::OpenOptions::new()
143                        .read_write(true)
144                        .open_sender(input_pipe)?
145                }
146            };
147            process_builder.message_writer = MessageWriter::spawn(sender)?.into()
148        }
149
150        if let Some(ref logging_type) = cmd.options.logging_type {
151            match logging_type {
152                LoggingType::StdoutOnly => {
153                    let path = self.working_dir.logs_stdout(id);
154                    let file = Self::create_log_file(&path)?;
155                    child.stdout(file);
156                    let reader = LogReader::spawn(path.into(), None, None);
157                    process_builder.log_reader = reader.into();
158                }
159                LoggingType::StderrOnly => {
160                    let path = self.working_dir.logs_stderr(id);
161                    let file = Self::create_log_file(&path)?;
162                    child.stderr(file);
163                    let reader = LogReader::spawn(None, path.into(), None);
164                    process_builder.log_reader = reader.into();
165                }
166                LoggingType::StdoutAndStderr => {
167                    let stdout_path = self.working_dir.logs_stdout(id);
168                    let file = Self::create_log_file(&stdout_path)?;
169                    child.stdout(file);
170
171                    let stderr_path = self.working_dir.logs_stderr(id);
172                    let file = Self::create_log_file(&stderr_path)?;
173                    child.stderr(file);
174                    let reader = LogReader::spawn(stdout_path.into(), stderr_path.into(), None);
175                    process_builder.log_reader = reader.into();
176                }
177                LoggingType::StdoutAndStderrMerged => {
178                    let path = self.working_dir.logs_merged(id);
179                    let file1 = Self::create_log_file(&path)?;
180                    let file2 = file1.try_clone()?;
181                    child.stdout(file1);
182                    child.stderr(file2);
183                    let reader = LogReader::spawn(None, None, path.into());
184                    process_builder.log_reader = reader.into();
185                }
186            }
187        }
188
189        let child_handle = child.spawn()?;
190        Ok(process_builder.build(child_handle))
191    }
192
193    fn create_named_pipe(pipe_path: &PathBuf) -> Result<(), SpawnerError> {
194        unistd::mkfifo(pipe_path, stat::Mode::S_IRWXU)
195            .map_err(|err| SpawnerError::CannotCreateNamedPipe(pipe_path.clone(), err))
196    }
197
198    fn create_log_file(path: &PathBuf) -> Result<File, SpawnerError> {
199        let file = File::options().append(true).create(true).open(path)?;
200        Ok(file)
201    }
202}
203
204#[derive(thiserror::Error, Debug)]
205pub enum SpawnerError {
206    #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
207    CannotCreateNamedPipe(PathBuf, Errno),
208    #[error("Cannot spawn process: {0}")]
209    CannotSpawnProcess(#[from] io::Error),
210    #[error("Cannot create process directory: {0}")]
211    CannotCreateProcessWorkingDir(io::Error),
212    #[error("Bootstrap process failed: {0}")]
213    BootstrapProcessFailed(String),
214}
215
216#[cfg(test)]
217mod tests {
218    use std::time::Duration;
219
220    use crate::process::{log_reader::LogSettingsQuery, CmdOptions};
221
222    use super::*;
223
224    use sysinfo::{Pid, System};
225    use tempfile::env::temp_dir;
226
227    #[tokio::test]
228    async fn should_spawn_process() {
229        let working_dir = WorkingDir::new(temp_dir());
230        let spawner = ProcessSpawner::new(working_dir.clone());
231
232        let id = ProcessId::random();
233        let result = spawner.spawn_runnable(&id, Box::new(cat_cmd()));
234        assert!(working_dir.process_dir(&id).exists());
235        assert!(working_dir.process_data_dir(&id).exists());
236        assert!(result.is_ok());
237    }
238
239    #[tokio::test]
240    async fn should_kill_process_on_drop() {
241        let cmd = cat_cmd();
242        let working_dir = WorkingDir::new(temp_dir());
243        let spawner = ProcessSpawner::new(working_dir);
244
245        let id = ProcessId::random();
246        let process = spawner.spawn(&id, cmd).unwrap();
247        let pid = process.child.id().unwrap();
248        let pid = Pid::from_u32(pid);
249
250        let sys = System::new_all();
251        assert!(sys.process(pid).is_some());
252
253        drop(process);
254        tokio::time::sleep(Duration::from_secs(1)).await;
255
256        let sys = System::new_all();
257        assert!(sys.process(pid).is_none());
258    }
259
260    #[tokio::test]
261    async fn should_setup_logging() {
262        let working_dir = WorkingDir::new(temp_dir());
263        let spawner = ProcessSpawner::new(working_dir);
264
265        // StdoutOnly
266        let process = spawn_process_with_logging(&spawner, LoggingType::StdoutOnly);
267
268        check_logs_settings(&process, LogSettingsQuery::Stdout, true).await;
269        check_logs_settings(&process, LogSettingsQuery::Stderr, false).await;
270        check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
271
272        // StderrOnly
273        let process = spawn_process_with_logging(&spawner, LoggingType::StderrOnly);
274
275        check_logs_settings(&process, LogSettingsQuery::Stdout, false).await;
276        check_logs_settings(&process, LogSettingsQuery::Stderr, true).await;
277        check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
278
279        // StdoutAndStderr
280        let process = spawn_process_with_logging(&spawner, LoggingType::StdoutAndStderr);
281
282        check_logs_settings(&process, LogSettingsQuery::Stdout, true).await;
283        check_logs_settings(&process, LogSettingsQuery::Stderr, true).await;
284        check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
285
286        // StdoutAndStderrMerged
287        let process = spawn_process_with_logging(&spawner, LoggingType::StdoutAndStderrMerged);
288
289        check_logs_settings(&process, LogSettingsQuery::Stdout, false).await;
290        check_logs_settings(&process, LogSettingsQuery::Stderr, false).await;
291        check_logs_settings(&process, LogSettingsQuery::Merged, true).await;
292    }
293
294    fn spawn_process_with_logging(spawner: &ProcessSpawner, logging_type: LoggingType) -> Process {
295        let id = ProcessId::random();
296        let cmd = echo_cmd(logging_type);
297        spawner.spawn_runnable(&id, Box::new(cmd)).unwrap()
298    }
299
300    async fn check_logs_settings(process: &Process, query: LogSettingsQuery, expected: bool) {
301        assert_eq!(
302            expected,
303            process
304                .log_reader
305                .as_ref()
306                .unwrap()
307                .check_logs_settings(query)
308                .await
309                .unwrap()
310        );
311    }
312
313    fn cat_cmd() -> Cmd {
314        Cmd::new("cat")
315    }
316
317    fn echo_cmd(logging_type: LoggingType) -> Cmd {
318        Cmd::with_args_and_options(
319            "echo",
320            ["-n", "message"],
321            CmdOptions::with_logging(logging_type),
322        )
323    }
324}