1use super::{utils::command::create_command, Action, ActionError};
2use crate::context::Context;
3use duct::{Expression, ReaderHandle};
4use log::{debug, error, info, trace, warn};
5use std::{
6 io::{BufRead, BufReader},
7 sync::{Arc, RwLock},
8 thread::{self, sleep},
9 time::Duration,
10};
11use thiserror::Error;
12
13#[cfg(unix)]
14use nix::{errno::Errno, sys::signal::Signal};
15#[cfg(unix)]
16use std::{os::unix::process::ExitStatusExt, str::FromStr};
17
18const ACTION_NAME: &str = "PROCESS";
19
20#[derive(Debug, Error, PartialEq, Eq)]
22pub enum ProcessError {
23 #[error("the command {0:?} cannot be parsed")]
25 CommandParseFailure(String),
26 #[error("the signal {0} is not valid")]
28 SignalParseFailure(String),
29 #[error("the script cannot start: {0}")]
31 StartFailure(String),
32 #[error("the script cannot be stopped: {0}")]
34 StopFailure(String),
35 #[cfg(unix)]
37 #[error("killing the process failed with error: {0}")]
38 KillFailed(#[from] Errno),
39 #[error("the mutex is poisoned")]
41 MutexPoisoned,
42}
43
44impl From<ProcessError> for ActionError {
45 fn from(value: ProcessError) -> Self {
46 match value {
47 ProcessError::CommandParseFailure(_) | ProcessError::SignalParseFailure(_) => {
48 ActionError::Misconfigured(value.to_string())
49 }
50 _ => ActionError::FailedAction(value.to_string()),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct ProcessParams {
58 directory: String,
59 command: String,
60 process: Expression,
61 retries: u32,
62 #[cfg(unix)]
63 stop_signal: Signal,
64 #[cfg(unix)]
65 stop_timeout: Duration,
66 runs_in_shell: bool,
67}
68
69impl ProcessParams {
70 pub fn new(
71 original_command: String,
72 directory: String,
73 runs_in_shell: bool,
74 ) -> Result<ProcessParams, ProcessError> {
75 let (command, process) = create_command(&original_command, runs_in_shell)
76 .ok_or(ProcessError::CommandParseFailure(original_command.clone()))?;
77
78 Ok(ProcessParams {
79 directory,
80 command,
81 process,
82 retries: 0,
83 #[cfg(unix)]
84 stop_signal: Signal::SIGTERM,
85 #[cfg(unix)]
86 stop_timeout: Duration::from_secs(10),
87 runs_in_shell,
88 })
89 }
90
91 pub fn set_retries(&mut self, retries: u32) {
92 self.retries = retries;
93 }
94
95 #[cfg_attr(not(unix), allow(unused_variables))]
96 pub fn set_stop_signal(&mut self, stop_signal: String) -> Result<(), ProcessError> {
97 #[cfg(unix)]
98 {
99 self.stop_signal = Signal::from_str(&stop_signal)
100 .map_err(|_| ProcessError::SignalParseFailure(stop_signal))?;
101 }
102
103 Ok(())
104 }
105
106 #[cfg_attr(not(unix), allow(unused_variables))]
107 pub fn set_stop_timeout(&mut self, stop_timeout: Duration) {
108 #[cfg(unix)]
109 {
110 self.stop_timeout = stop_timeout;
111 }
112 }
113}
114
115#[derive(Debug)]
117#[cfg_attr(unix, allow(dead_code))]
118pub struct Process {
119 child: Arc<RwLock<Option<ReaderHandle>>>,
120 #[cfg(unix)]
121 stop_signal: Signal,
122 #[cfg(unix)]
123 stop_timeout: Duration,
124}
125
126impl Process {
127 fn start_child(params: &ProcessParams) -> Result<ReaderHandle, ProcessError> {
128 info!(
129 "Starting process {:?} {}in {}.",
130 params.command,
131 if params.runs_in_shell {
132 "in a shell "
133 } else {
134 ""
135 },
136 params.directory,
137 );
138
139 let child = params
141 .process
142 .dir(¶ms.directory)
143 .stderr_to_stdout()
144 .env("CI", "true")
145 .env("GW_ACTION_NAME", ACTION_NAME)
146 .env("GW_DIRECTORY", ¶ms.directory)
147 .unchecked()
148 .reader()
149 .map_err(|err| ProcessError::StartFailure(err.to_string()))?;
150
151 if let Some(pid) = child.pids().first() {
152 trace!("Started process with pid {pid}.",);
153 }
154
155 Ok(child)
156 }
157
158 fn start(params: &ProcessParams) -> Result<Process, ProcessError> {
159 let child = Arc::new(RwLock::new(Some(Process::start_child(params)?)));
160
161 let command_id = params.command.clone();
162 let max_retries = params.retries;
163 let thread_params = params.clone();
164 let thread_child = child.clone();
165 thread::spawn(move || {
166 let mut tries = max_retries + 1;
167
168 loop {
169 trace!("Locking the subprocess to get the stdout.");
170 if let Some(stdout) = thread_child.read().unwrap().as_ref() {
171 let mut reader = BufReader::new(stdout).lines();
172 trace!("Reading lines from the stdout.");
173 while let Some(Ok(line)) = reader.next() {
174 debug!("[{command_id}] {line}");
175 }
176
177 #[cfg_attr(not(unix), allow(unused_variables))]
178 if let Ok(Some(output)) = stdout.try_wait() {
179 #[cfg(unix)]
180 if output.status.signal().is_some() {
181 trace!("Process is signalled, no retries necessary.");
182 return;
183 }
184 }
185 } else {
186 error!("Failed taking the stdout of process.");
187 break;
188 }
189
190 tries -= 1;
191 if tries == 0 {
192 break;
193 }
194
195 warn!(
196 "Process {:?} failed, retrying ({} retries left).",
197 thread_params.command, tries
198 );
199
200 sleep(Duration::from_millis(100));
201 match Process::start_child(&thread_params) {
202 Ok(new_child) => {
203 trace!("Locking the subprocess to replace the child with the new process.");
204 if let Ok(mut unlocked_child) = thread_child.write() {
205 unlocked_child.replace(new_child);
206 } else {
207 error!("Failed locking the child, the mutex might be poisoned.");
208 }
209 }
210 Err(err) => {
211 error!("Failed retrying the process: {err}.");
212 break;
213 }
214 }
215 }
216
217 trace!("Locking the subprocess to remove the child.");
218 if let Ok(mut unlocked_child) = thread_child.write() {
219 unlocked_child.take();
220 trace!("The failed process is removed.");
221 } else {
222 error!("Failed locking the child, the mutex might be poisoned.");
223 }
224
225 error!(
226 "Process {:?} {}, we are not retrying anymore.",
227 thread_params.command,
228 if max_retries > 0 {
229 format!("failed more than {max_retries} times")
230 } else {
231 "failed with 0 retries".to_string()
232 },
233 );
234 });
235
236 Ok(Process {
237 child,
238 #[cfg(unix)]
239 stop_signal: params.stop_signal,
240 #[cfg(unix)]
241 stop_timeout: params.stop_timeout,
242 })
243 }
244
245 #[cfg(unix)]
246 fn stop(&mut self) -> Result<(), ProcessError> {
247 use duration_string::DurationString;
248 use log::trace;
249 use nix::sys::signal::kill;
250 use nix::unistd::Pid;
251 use std::thread::sleep;
252 use std::time::Instant;
253
254 trace!("Locking the subprocess to stop it.");
255 if let Some(child) = self
256 .child
257 .read()
258 .map_err(|_| ProcessError::MutexPoisoned)?
259 .as_ref()
260 {
261 let pid = Pid::from_raw(
262 *child
263 .pids()
264 .first()
265 .ok_or(ProcessError::StopFailure("pid not found".to_string()))?
266 as i32,
267 );
268
269 trace!(
270 "Trying to stop process: sending {} to {}.",
271 self.stop_signal,
272 pid
273 );
274 kill(pid, self.stop_signal)?;
275
276 let start_time = Instant::now();
277 while start_time.elapsed() < self.stop_timeout {
278 if let Ok(Some(output)) = child.try_wait() {
279 info!("Process stopped gracefully with status {}.", output.status);
280 return Ok(());
281 }
282 sleep(Duration::from_secs(1));
283 }
284
285 debug!(
286 "Process didn't stop gracefully after {}. Killing process.",
287 DurationString::from(self.stop_timeout)
288 );
289
290 child
291 .kill()
292 .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
293
294 info!("Process killed successfully.");
295 } else {
296 debug!("Cannot restart process, because it has already failed.");
297 }
298
299 Ok(())
300 }
301
302 #[cfg(not(unix))]
303 fn stop(&mut self) -> Result<(), ProcessError> {
304 trace!("Locking the subprocess to stop it.");
305 if let Some(child) = self
306 .child
307 .read()
308 .map_err(|_| ProcessError::MutexPoisoned)?
309 .as_ref()
310 {
311 child
312 .kill()
313 .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
314
315 info!("Process stopped successfully.");
316 } else {
317 debug!("Cannot restart process, because it has already failed.");
318 }
319
320 Ok(())
321 }
322}
323
324#[derive(Debug)]
326pub struct ProcessAction {
327 params: ProcessParams,
328 process: Process,
329}
330
331impl ProcessAction {
332 pub fn new(params: ProcessParams) -> Result<ProcessAction, ProcessError> {
334 let process = Process::start(¶ms)?;
335
336 Ok(ProcessAction { params, process })
337 }
338
339 fn run_inner(&mut self) -> Result<(), ProcessError> {
340 self.process
341 .stop()
342 .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
343 self.process = Process::start(&self.params)?;
344
345 Ok(())
346 }
347}
348
349impl Action for ProcessAction {
350 fn run(&mut self, _context: &Context) -> Result<(), ActionError> {
352 Ok(self.run_inner()?)
353 }
354}
355
356#[cfg(test)]
357#[cfg_attr(not(unix), allow(unused_imports))]
358mod tests {
359 use super::*;
360 use std::{fs, time::Instant};
361 use thread::sleep;
362
363 const SLEEP_PARSING: &str = "sleep 100";
364 const SLEEP_INVALID: &str = "sleep '100";
365 const EXIT_NONZERO: &str = "exit 1";
366
367 #[cfg(unix)]
368 const SLEEP: &str = "sleep 100";
369
370 #[cfg(not(unix))]
371 const SLEEP: &str = "timeout /t 100";
372
373 #[test]
374 fn it_should_start_a_new_process() -> Result<(), ProcessError> {
375 let params = ProcessParams::new(String::from(SLEEP_PARSING), String::from("."), false)?;
376 let mut action = ProcessAction::new(params)?;
377 action.process.stop()?;
378
379 assert_eq!("sleep", action.params.command);
380 assert_eq!(".", action.params.directory);
381
382 Ok(())
383 }
384
385 #[test]
386 fn it_should_fail_if_command_is_invalid() -> Result<(), ProcessError> {
387 let failing_command = String::from(SLEEP_INVALID);
388 let failing_params = ProcessParams::new(failing_command.clone(), String::from("."), false);
389
390 assert_eq!(
391 ProcessError::CommandParseFailure(failing_command),
392 failing_params.unwrap_err(),
393 );
394
395 Ok(())
396 }
397
398 #[test]
399 #[cfg(unix)]
400 fn it_should_fail_if_signal_is_invalid() -> Result<(), ProcessError> {
401 let failing_signal = String::from("SIGWTF");
402 let failing_params = ProcessParams::new(String::from(SLEEP), String::from("."), false)?
403 .set_stop_signal(failing_signal.clone());
404
405 assert_eq!(
406 ProcessError::SignalParseFailure(failing_signal),
407 failing_params.unwrap_err(),
408 );
409
410 Ok(())
411 }
412
413 #[test]
414 fn it_should_restart_the_process_gracefully() -> Result<(), ProcessError> {
415 let stop_timeout = Duration::from_secs(5);
416 let params = ProcessParams::new(String::from(SLEEP), String::from("."), false)?;
417 let mut action = ProcessAction::new(params)?;
418
419 let initial_time = Instant::now();
420 let first_pid = action
421 .process
422 .child
423 .read()
424 .unwrap()
425 .as_ref()
426 .unwrap()
427 .pids();
428 action.run_inner()?;
429 let second_pid = action
430 .process
431 .child
432 .read()
433 .unwrap()
434 .as_ref()
435 .unwrap()
436 .pids();
437 action.process.stop()?;
438
439 assert_ne!(
440 first_pid, second_pid,
441 "First and second run should have different pids."
442 );
443 assert!(
444 initial_time.elapsed() <= stop_timeout,
445 "The stop timeout should not be elapsed."
446 );
447
448 Ok(())
449 }
450
451 #[test]
452 fn it_should_retry_the_process_if_it_exits_until_the_retry_count() -> Result<(), ProcessError> {
453 let params = ProcessParams::new(String::from(EXIT_NONZERO), String::from("."), true)?;
454 let action = ProcessAction::new(params)?;
455
456 sleep(Duration::from_secs(1));
457
458 let is_child_exited = action.process.child.read().unwrap().as_ref().is_none();
459
460 assert!(is_child_exited, "The child should exit.");
461
462 Ok(())
463 }
464
465 #[test]
466 #[cfg(unix)]
467 fn it_should_reset_the_retries() -> Result<(), ProcessError> {
468 let tailed_file = "./test_directories/tailed_file";
469 let params =
470 ProcessParams::new(format!("tail -f {tailed_file}"), String::from("."), false)?;
471
472 let mut action = ProcessAction::new(params)?;
474
475 fs::write(tailed_file, "").unwrap();
477 action.run_inner()?;
478
479 let is_child_running = action.process.child.read().unwrap().as_ref().is_some();
480 assert!(is_child_running, "The child should be running.");
481
482 action.process.stop()?;
483 fs::remove_file(tailed_file).unwrap();
484
485 Ok(())
486 }
487}