gw_bin/actions/
process.rs

1use super::{utils::command::create_command, Action, ActionError};
2use crate::context::Context;
3use duct::{Expression, ReaderHandle};
4use log::{debug, error, info, trace, warn};
5use std::{
6    io::{BufRead, BufReader},
7    sync::{Arc, RwLock},
8    thread::{self, sleep},
9    time::Duration,
10};
11use thiserror::Error;
12
13#[cfg(unix)]
14use nix::{errno::Errno, sys::signal::Signal};
15#[cfg(unix)]
16use std::{os::unix::process::ExitStatusExt, str::FromStr};
17
18const ACTION_NAME: &str = "PROCESS";
19
20/// Custom error describing the error cases for the ProcessAction.
21#[derive(Debug, Error, PartialEq, Eq)]
22pub enum ProcessError {
23    /// The command is invalid (usually mismatched quotations etc.).
24    #[error("the command {0:?} cannot be parsed")]
25    CommandParseFailure(String),
26    /// Signal is not a valid UNIX signal.
27    #[error("the signal {0} is not valid")]
28    SignalParseFailure(String),
29    /// The underlying Rust command creation failed. The parameter contains the error.
30    #[error("the script cannot start: {0}")]
31    StartFailure(String),
32    /// Stopping the command failed.
33    #[error("the script cannot be stopped: {0}")]
34    StopFailure(String),
35    /// Killing the command failed.
36    #[cfg(unix)]
37    #[error("killing the process failed with error: {0}")]
38    KillFailed(#[from] Errno),
39    /// The lock on the child is poisoned: this means the thread failed while holding the lock.
40    #[error("the mutex is poisoned")]
41    MutexPoisoned,
42}
43
44impl From<ProcessError> for ActionError {
45    fn from(value: ProcessError) -> Self {
46        match value {
47            ProcessError::CommandParseFailure(_) | ProcessError::SignalParseFailure(_) => {
48                ActionError::Misconfigured(value.to_string())
49            }
50            _ => ActionError::FailedAction(value.to_string()),
51        }
52    }
53}
54
55/// Parameters for the process.
56#[derive(Debug, Clone)]
57pub struct ProcessParams {
58    directory: String,
59    command: String,
60    process: Expression,
61    retries: u32,
62    #[cfg(unix)]
63    stop_signal: Signal,
64    #[cfg(unix)]
65    stop_timeout: Duration,
66    runs_in_shell: bool,
67}
68
69impl ProcessParams {
70    pub fn new(
71        original_command: String,
72        directory: String,
73        runs_in_shell: bool,
74    ) -> Result<ProcessParams, ProcessError> {
75        let (command, process) = create_command(&original_command, runs_in_shell)
76            .ok_or(ProcessError::CommandParseFailure(original_command.clone()))?;
77
78        Ok(ProcessParams {
79            directory,
80            command,
81            process,
82            retries: 0,
83            #[cfg(unix)]
84            stop_signal: Signal::SIGTERM,
85            #[cfg(unix)]
86            stop_timeout: Duration::from_secs(10),
87            runs_in_shell,
88        })
89    }
90
91    pub fn set_retries(&mut self, retries: u32) {
92        self.retries = retries;
93    }
94
95    #[cfg_attr(not(unix), allow(unused_variables))]
96    pub fn set_stop_signal(&mut self, stop_signal: String) -> Result<(), ProcessError> {
97        #[cfg(unix)]
98        {
99            self.stop_signal = Signal::from_str(&stop_signal)
100                .map_err(|_| ProcessError::SignalParseFailure(stop_signal))?;
101        }
102
103        Ok(())
104    }
105
106    #[cfg_attr(not(unix), allow(unused_variables))]
107    pub fn set_stop_timeout(&mut self, stop_timeout: Duration) {
108        #[cfg(unix)]
109        {
110            self.stop_timeout = stop_timeout;
111        }
112    }
113}
114
115/// Struct that can handle the lifecycle of the process with restarting etc.
116#[derive(Debug)]
117#[cfg_attr(unix, allow(dead_code))]
118pub struct Process {
119    child: Arc<RwLock<Option<ReaderHandle>>>,
120    #[cfg(unix)]
121    stop_signal: Signal,
122    #[cfg(unix)]
123    stop_timeout: Duration,
124}
125
126impl Process {
127    fn start_child(params: &ProcessParams) -> Result<ReaderHandle, ProcessError> {
128        info!(
129            "Starting process {:?} {}in {}.",
130            params.command,
131            if params.runs_in_shell {
132                "in a shell "
133            } else {
134                ""
135            },
136            params.directory,
137        );
138
139        // Create child
140        let child = params
141            .process
142            .dir(&params.directory)
143            .stderr_to_stdout()
144            .env("CI", "true")
145            .env("GW_ACTION_NAME", ACTION_NAME)
146            .env("GW_DIRECTORY", &params.directory)
147            .unchecked()
148            .reader()
149            .map_err(|err| ProcessError::StartFailure(err.to_string()))?;
150
151        if let Some(pid) = child.pids().first() {
152            trace!("Started process with pid {pid}.",);
153        }
154
155        Ok(child)
156    }
157
158    fn start(params: &ProcessParams) -> Result<Process, ProcessError> {
159        let child = Arc::new(RwLock::new(Some(Process::start_child(params)?)));
160
161        let command_id = params.command.clone();
162        let max_retries = params.retries;
163        let thread_params = params.clone();
164        let thread_child = child.clone();
165        thread::spawn(move || {
166            let mut tries = max_retries + 1;
167
168            loop {
169                trace!("Locking the subprocess to get the stdout.");
170                if let Some(stdout) = thread_child.read().unwrap().as_ref() {
171                    let mut reader = BufReader::new(stdout).lines();
172                    trace!("Reading lines from the stdout.");
173                    while let Some(Ok(line)) = reader.next() {
174                        debug!("[{command_id}] {line}");
175                    }
176
177                    #[cfg_attr(not(unix), allow(unused_variables))]
178                    if let Ok(Some(output)) = stdout.try_wait() {
179                        #[cfg(unix)]
180                        if output.status.signal().is_some() {
181                            trace!("Process is signalled, no retries necessary.");
182                            return;
183                        }
184                    }
185                } else {
186                    error!("Failed taking the stdout of process.");
187                    break;
188                }
189
190                tries -= 1;
191                if tries == 0 {
192                    break;
193                }
194
195                warn!(
196                    "Process {:?} failed, retrying ({} retries left).",
197                    thread_params.command, tries
198                );
199
200                sleep(Duration::from_millis(100));
201                match Process::start_child(&thread_params) {
202                    Ok(new_child) => {
203                        trace!("Locking the subprocess to replace the child with the new process.");
204                        if let Ok(mut unlocked_child) = thread_child.write() {
205                            unlocked_child.replace(new_child);
206                        } else {
207                            error!("Failed locking the child, the mutex might be poisoned.");
208                        }
209                    }
210                    Err(err) => {
211                        error!("Failed retrying the process: {err}.");
212                        break;
213                    }
214                }
215            }
216
217            trace!("Locking the subprocess to remove the child.");
218            if let Ok(mut unlocked_child) = thread_child.write() {
219                unlocked_child.take();
220                trace!("The failed process is removed.");
221            } else {
222                error!("Failed locking the child, the mutex might be poisoned.");
223            }
224
225            error!(
226                "Process {:?} {}, we are not retrying anymore.",
227                thread_params.command,
228                if max_retries > 0 {
229                    format!("failed more than {max_retries} times")
230                } else {
231                    "failed with 0 retries".to_string()
232                },
233            );
234        });
235
236        Ok(Process {
237            child,
238            #[cfg(unix)]
239            stop_signal: params.stop_signal,
240            #[cfg(unix)]
241            stop_timeout: params.stop_timeout,
242        })
243    }
244
245    #[cfg(unix)]
246    fn stop(&mut self) -> Result<(), ProcessError> {
247        use duration_string::DurationString;
248        use log::trace;
249        use nix::sys::signal::kill;
250        use nix::unistd::Pid;
251        use std::thread::sleep;
252        use std::time::Instant;
253
254        trace!("Locking the subprocess to stop it.");
255        if let Some(child) = self
256            .child
257            .read()
258            .map_err(|_| ProcessError::MutexPoisoned)?
259            .as_ref()
260        {
261            let pid = Pid::from_raw(
262                *child
263                    .pids()
264                    .first()
265                    .ok_or(ProcessError::StopFailure("pid not found".to_string()))?
266                    as i32,
267            );
268
269            trace!(
270                "Trying to stop process: sending {} to {}.",
271                self.stop_signal,
272                pid
273            );
274            kill(pid, self.stop_signal)?;
275
276            let start_time = Instant::now();
277            while start_time.elapsed() < self.stop_timeout {
278                if let Ok(Some(output)) = child.try_wait() {
279                    info!("Process stopped gracefully with status {}.", output.status);
280                    return Ok(());
281                }
282                sleep(Duration::from_secs(1));
283            }
284
285            debug!(
286                "Process didn't stop gracefully after {}. Killing process.",
287                DurationString::from(self.stop_timeout)
288            );
289
290            child
291                .kill()
292                .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
293
294            info!("Process killed successfully.");
295        } else {
296            debug!("Cannot restart process, because it has already failed.");
297        }
298
299        Ok(())
300    }
301
302    #[cfg(not(unix))]
303    fn stop(&mut self) -> Result<(), ProcessError> {
304        trace!("Locking the subprocess to stop it.");
305        if let Some(child) = self
306            .child
307            .read()
308            .map_err(|_| ProcessError::MutexPoisoned)?
309            .as_ref()
310        {
311            child
312                .kill()
313                .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
314
315            info!("Process stopped successfully.");
316        } else {
317            debug!("Cannot restart process, because it has already failed.");
318        }
319
320        Ok(())
321    }
322}
323
324/// An action to run in the background and restart a subprocess.
325#[derive(Debug)]
326pub struct ProcessAction {
327    params: ProcessParams,
328    process: Process,
329}
330
331impl ProcessAction {
332    /// Creates a new process in the background.
333    pub fn new(params: ProcessParams) -> Result<ProcessAction, ProcessError> {
334        let process = Process::start(&params)?;
335
336        Ok(ProcessAction { params, process })
337    }
338
339    fn run_inner(&mut self) -> Result<(), ProcessError> {
340        self.process
341            .stop()
342            .map_err(|err| ProcessError::StopFailure(err.to_string()))?;
343        self.process = Process::start(&self.params)?;
344
345        Ok(())
346    }
347}
348
349impl Action for ProcessAction {
350    /// Kills and restarts the subprocess.
351    fn run(&mut self, _context: &Context) -> Result<(), ActionError> {
352        Ok(self.run_inner()?)
353    }
354}
355
356#[cfg(test)]
357#[cfg_attr(not(unix), allow(unused_imports))]
358mod tests {
359    use super::*;
360    use std::{fs, time::Instant};
361    use thread::sleep;
362
363    const SLEEP_PARSING: &str = "sleep 100";
364    const SLEEP_INVALID: &str = "sleep '100";
365    const EXIT_NONZERO: &str = "exit 1";
366
367    #[cfg(unix)]
368    const SLEEP: &str = "sleep 100";
369
370    #[cfg(not(unix))]
371    const SLEEP: &str = "timeout /t 100";
372
373    #[test]
374    fn it_should_start_a_new_process() -> Result<(), ProcessError> {
375        let params = ProcessParams::new(String::from(SLEEP_PARSING), String::from("."), false)?;
376        let mut action = ProcessAction::new(params)?;
377        action.process.stop()?;
378
379        assert_eq!("sleep", action.params.command);
380        assert_eq!(".", action.params.directory);
381
382        Ok(())
383    }
384
385    #[test]
386    fn it_should_fail_if_command_is_invalid() -> Result<(), ProcessError> {
387        let failing_command = String::from(SLEEP_INVALID);
388        let failing_params = ProcessParams::new(failing_command.clone(), String::from("."), false);
389
390        assert_eq!(
391            ProcessError::CommandParseFailure(failing_command),
392            failing_params.unwrap_err(),
393        );
394
395        Ok(())
396    }
397
398    #[test]
399    #[cfg(unix)]
400    fn it_should_fail_if_signal_is_invalid() -> Result<(), ProcessError> {
401        let failing_signal = String::from("SIGWTF");
402        let failing_params = ProcessParams::new(String::from(SLEEP), String::from("."), false)?
403            .set_stop_signal(failing_signal.clone());
404
405        assert_eq!(
406            ProcessError::SignalParseFailure(failing_signal),
407            failing_params.unwrap_err(),
408        );
409
410        Ok(())
411    }
412
413    #[test]
414    fn it_should_restart_the_process_gracefully() -> Result<(), ProcessError> {
415        let stop_timeout = Duration::from_secs(5);
416        let params = ProcessParams::new(String::from(SLEEP), String::from("."), false)?;
417        let mut action = ProcessAction::new(params)?;
418
419        let initial_time = Instant::now();
420        let first_pid = action
421            .process
422            .child
423            .read()
424            .unwrap()
425            .as_ref()
426            .unwrap()
427            .pids();
428        action.run_inner()?;
429        let second_pid = action
430            .process
431            .child
432            .read()
433            .unwrap()
434            .as_ref()
435            .unwrap()
436            .pids();
437        action.process.stop()?;
438
439        assert_ne!(
440            first_pid, second_pid,
441            "First and second run should have different pids."
442        );
443        assert!(
444            initial_time.elapsed() <= stop_timeout,
445            "The stop timeout should not be elapsed."
446        );
447
448        Ok(())
449    }
450
451    #[test]
452    fn it_should_retry_the_process_if_it_exits_until_the_retry_count() -> Result<(), ProcessError> {
453        let params = ProcessParams::new(String::from(EXIT_NONZERO), String::from("."), true)?;
454        let action = ProcessAction::new(params)?;
455
456        sleep(Duration::from_secs(1));
457
458        let is_child_exited = action.process.child.read().unwrap().as_ref().is_none();
459
460        assert!(is_child_exited, "The child should exit.");
461
462        Ok(())
463    }
464
465    #[test]
466    #[cfg(unix)]
467    fn it_should_reset_the_retries() -> Result<(), ProcessError> {
468        let tailed_file = "./test_directories/tailed_file";
469        let params =
470            ProcessParams::new(format!("tail -f {tailed_file}"), String::from("."), false)?;
471
472        // First time it should fail, because the file doesn't exist yet
473        let mut action = ProcessAction::new(params)?;
474
475        // Create the file and restart it quickly to see the retries reset
476        fs::write(tailed_file, "").unwrap();
477        action.run_inner()?;
478
479        let is_child_running = action.process.child.read().unwrap().as_ref().is_some();
480        assert!(is_child_running, "The child should be running.");
481
482        action.process.stop()?;
483        fs::remove_file(tailed_file).unwrap();
484
485        Ok(())
486    }
487}