1use std::{
2 fs::{self, File},
3 io,
4 path::{Path, PathBuf},
5 process::Stdio,
6};
7
8use nix::{
9 errno::Errno,
10 sys::stat::{self},
11 unistd,
12};
13use tokio::{net::unix, process::Command};
14
15use crate::working_dir::WorkingDir;
16
17use super::{
18 log_reader::LogReader,
19 model::{MessagingType, Process, ProcessBuilder},
20 reader::MessageReader,
21 writer::MessageWriter,
22 Cmd, LoggingType, ProcessId, Runnable,
23};
24
25pub const PROCESS_DATA_DIR_ENV_NAME: &str = "PROCESS_DATA_DIR";
29pub const INPUT_PIPE_ENV_NAME: &str = "INPUT_PIPE";
33pub const OUTPUT_PIPE_ENV_NAME: &str = "OUTPUT_PIPE";
37
38pub struct ProcessSpawner {
39 working_dir: WorkingDir,
40}
41
42impl ProcessSpawner {
43 pub fn new(working_dir: WorkingDir) -> Self {
44 Self { working_dir }
45 }
46
47 pub fn spawn_runnable(
48 &self,
49 id: &ProcessId,
50 runnable: Box<dyn Runnable>,
51 ) -> Result<Process, SpawnerError> {
52 let process_dir = self.working_dir.process_dir(id);
53 fs::create_dir(&process_dir).map_err(SpawnerError::CannotCreateProcessWorkingDir)?;
54 fs::create_dir(self.working_dir.process_data_dir(id))?;
55 match self.try_spawn_runnable(id, &*runnable, &process_dir) {
56 Ok(process) => Ok(process),
57 Err(err) => {
58 let _ = runnable.clean_after_fail(&process_dir);
59 let _ = fs::remove_dir_all(&process_dir);
60 Err(err)
61 }
62 }
63 }
64
65 fn try_spawn_runnable(
66 &self,
67 id: &ProcessId,
68 runnable: &dyn Runnable,
69 process_dir: &Path,
70 ) -> Result<Process, SpawnerError> {
71 let cmd = runnable
72 .bootstrap_cmd(process_dir)
73 .map_err(SpawnerError::BootstrapProcessFailed)?;
74 self.spawn(id, cmd)
75 }
76
77 fn spawn(&self, id: &ProcessId, cmd: Cmd) -> Result<Process, SpawnerError> {
78 let mut child = Command::new(cmd.cmd);
79 if !cmd.args.is_empty() {
80 child.args(cmd.args);
81 }
82 child.kill_on_drop(true);
83
84 if let Some(current_dir) = cmd.options.current_dir {
85 child.current_dir(current_dir);
86 }
87
88 if cmd.options.clear_envs {
89 child.env_clear();
90 }
91
92 for env in cmd.options.envs_to_remove {
93 child.env_remove(env);
94 }
95
96 if !cmd.options.envs.is_empty() {
97 child.envs(cmd.options.envs);
98 }
99
100 let mut process_builder = ProcessBuilder::default();
101
102 child.env(
103 PROCESS_DATA_DIR_ENV_NAME,
104 self.working_dir.process_data_dir(id),
105 );
106
107 child.stdin(Stdio::null());
108 child.stdout(Stdio::null());
109 child.stderr(Stdio::null());
110
111 if let Some(messaging_type) = cmd.options.message_output {
112 let receiver = match messaging_type {
113 MessagingType::StandardIo => {
114 let (sender, receiver) = unix::pipe::pipe().unwrap();
115 child.stdout(sender.into_nonblocking_fd()?);
116 receiver
117 }
118 MessagingType::NamedPipe => {
119 let output_pipe = self.working_dir.message_reader_pipe(id);
120 Self::create_named_pipe(&output_pipe)?;
121 child.env(OUTPUT_PIPE_ENV_NAME, output_pipe.clone());
122 unix::pipe::OpenOptions::new()
123 .read_write(true)
124 .open_receiver(output_pipe)?
125 }
126 };
127 let message_reader = MessageReader::spawn(receiver, cmd.options.output_buffer_capacity);
128 process_builder.message_reader = message_reader.into();
129 }
130
131 if let Some(messaging_type) = cmd.options.message_input {
132 let sender = match messaging_type {
133 MessagingType::StandardIo => {
134 let (sender, receiver) = unix::pipe::pipe().unwrap();
135 child.stdin(receiver.into_nonblocking_fd()?);
136 sender
137 }
138 MessagingType::NamedPipe => {
139 let input_pipe = self.working_dir.message_writer_pipe(id);
140 Self::create_named_pipe(&input_pipe)?;
141 child.env(INPUT_PIPE_ENV_NAME, input_pipe.clone());
142 unix::pipe::OpenOptions::new()
143 .read_write(true)
144 .open_sender(input_pipe)?
145 }
146 };
147 process_builder.message_writer = MessageWriter::spawn(sender)?.into()
148 }
149
150 if let Some(ref logging_type) = cmd.options.logging_type {
151 match logging_type {
152 LoggingType::StdoutOnly => {
153 let path = self.working_dir.logs_stdout(id);
154 let file = Self::create_log_file(&path)?;
155 child.stdout(file);
156 let reader = LogReader::spawn(path.into(), None, None);
157 process_builder.log_reader = reader.into();
158 }
159 LoggingType::StderrOnly => {
160 let path = self.working_dir.logs_stderr(id);
161 let file = Self::create_log_file(&path)?;
162 child.stderr(file);
163 let reader = LogReader::spawn(None, path.into(), None);
164 process_builder.log_reader = reader.into();
165 }
166 LoggingType::StdoutAndStderr => {
167 let stdout_path = self.working_dir.logs_stdout(id);
168 let file = Self::create_log_file(&stdout_path)?;
169 child.stdout(file);
170
171 let stderr_path = self.working_dir.logs_stderr(id);
172 let file = Self::create_log_file(&stderr_path)?;
173 child.stderr(file);
174 let reader = LogReader::spawn(stdout_path.into(), stderr_path.into(), None);
175 process_builder.log_reader = reader.into();
176 }
177 LoggingType::StdoutAndStderrMerged => {
178 let path = self.working_dir.logs_merged(id);
179 let file1 = Self::create_log_file(&path)?;
180 let file2 = file1.try_clone()?;
181 child.stdout(file1);
182 child.stderr(file2);
183 let reader = LogReader::spawn(None, None, path.into());
184 process_builder.log_reader = reader.into();
185 }
186 }
187 }
188
189 let child_handle = child.spawn()?;
190 Ok(process_builder.build(child_handle))
191 }
192
193 fn create_named_pipe(pipe_path: &PathBuf) -> Result<(), SpawnerError> {
194 unistd::mkfifo(pipe_path, stat::Mode::S_IRWXU)
195 .map_err(|err| SpawnerError::CannotCreateNamedPipe(pipe_path.clone(), err))
196 }
197
198 fn create_log_file(path: &PathBuf) -> Result<File, SpawnerError> {
199 let file = File::options().append(true).create(true).open(path)?;
200 Ok(file)
201 }
202}
203
204#[derive(thiserror::Error, Debug)]
205pub enum SpawnerError {
206 #[error("Cannot create named pipe at path: {0}. Error code: {1}")]
207 CannotCreateNamedPipe(PathBuf, Errno),
208 #[error("Cannot spawn process: {0}")]
209 CannotSpawnProcess(#[from] io::Error),
210 #[error("Cannot create process directory: {0}")]
211 CannotCreateProcessWorkingDir(io::Error),
212 #[error("Bootstrap process failed: {0}")]
213 BootstrapProcessFailed(String),
214}
215
216#[cfg(test)]
217mod tests {
218 use std::time::Duration;
219
220 use crate::process::{log_reader::LogSettingsQuery, CmdOptions};
221
222 use super::*;
223
224 use sysinfo::{Pid, System};
225 use tempfile::env::temp_dir;
226
227 #[tokio::test]
228 async fn should_spawn_process() {
229 let working_dir = WorkingDir::new(temp_dir());
230 let spawner = ProcessSpawner::new(working_dir.clone());
231
232 let id = ProcessId::random();
233 let result = spawner.spawn_runnable(&id, Box::new(cat_cmd()));
234 assert!(working_dir.process_dir(&id).exists());
235 assert!(working_dir.process_data_dir(&id).exists());
236 assert!(result.is_ok());
237 }
238
239 #[tokio::test]
240 async fn should_kill_process_on_drop() {
241 let cmd = cat_cmd();
242 let working_dir = WorkingDir::new(temp_dir());
243 let spawner = ProcessSpawner::new(working_dir);
244
245 let id = ProcessId::random();
246 let process = spawner.spawn(&id, cmd).unwrap();
247 let pid = process.child.id().unwrap();
248 let pid = Pid::from_u32(pid);
249
250 let sys = System::new_all();
251 assert!(sys.process(pid).is_some());
252
253 drop(process);
254 tokio::time::sleep(Duration::from_secs(1)).await;
255
256 let sys = System::new_all();
257 assert!(sys.process(pid).is_none());
258 }
259
260 #[tokio::test]
261 async fn should_setup_logging() {
262 let working_dir = WorkingDir::new(temp_dir());
263 let spawner = ProcessSpawner::new(working_dir);
264
265 let process = spawn_process_with_logging(&spawner, LoggingType::StdoutOnly);
267
268 check_logs_settings(&process, LogSettingsQuery::Stdout, true).await;
269 check_logs_settings(&process, LogSettingsQuery::Stderr, false).await;
270 check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
271
272 let process = spawn_process_with_logging(&spawner, LoggingType::StderrOnly);
274
275 check_logs_settings(&process, LogSettingsQuery::Stdout, false).await;
276 check_logs_settings(&process, LogSettingsQuery::Stderr, true).await;
277 check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
278
279 let process = spawn_process_with_logging(&spawner, LoggingType::StdoutAndStderr);
281
282 check_logs_settings(&process, LogSettingsQuery::Stdout, true).await;
283 check_logs_settings(&process, LogSettingsQuery::Stderr, true).await;
284 check_logs_settings(&process, LogSettingsQuery::Merged, false).await;
285
286 let process = spawn_process_with_logging(&spawner, LoggingType::StdoutAndStderrMerged);
288
289 check_logs_settings(&process, LogSettingsQuery::Stdout, false).await;
290 check_logs_settings(&process, LogSettingsQuery::Stderr, false).await;
291 check_logs_settings(&process, LogSettingsQuery::Merged, true).await;
292 }
293
294 fn spawn_process_with_logging(spawner: &ProcessSpawner, logging_type: LoggingType) -> Process {
295 let id = ProcessId::random();
296 let cmd = echo_cmd(logging_type);
297 spawner.spawn_runnable(&id, Box::new(cmd)).unwrap()
298 }
299
300 async fn check_logs_settings(process: &Process, query: LogSettingsQuery, expected: bool) {
301 assert_eq!(
302 expected,
303 process
304 .log_reader
305 .as_ref()
306 .unwrap()
307 .check_logs_settings(query)
308 .await
309 .unwrap()
310 );
311 }
312
313 fn cat_cmd() -> Cmd {
314 Cmd::new("cat")
315 }
316
317 fn echo_cmd(logging_type: LoggingType) -> Cmd {
318 Cmd::with_args_and_options(
319 "echo",
320 ["-n", "message"],
321 CmdOptions::with_logging(logging_type),
322 )
323 }
324}