steward/
process.rs

1use std::{
2    io,
3    process::{Output, Stdio},
4    sync::{
5        atomic::{AtomicBool, AtomicUsize, Ordering},
6        Arc,
7    },
8    time::{Duration, Instant},
9};
10
11use console::Color;
12use tokio::{
13    io::{AsyncBufReadExt, BufReader},
14    process::{Child, ChildStderr, ChildStdout},
15    signal, task, time,
16};
17
18use crate::{Cmd, Dependency, Error, KillTimeout, Location, Result, SpawnOptions};
19
20/// Long running process. Can be constructed via [`Process::new`](Process::new) or convenience [`process!`](crate::process!) macro.
21pub struct Process<Loc> {
22    /// Tag used as an identificator in output when process runs as a part of a [`ProcessPool`](ProcessPool).
23    pub tag: &'static str,
24    /// [Command](Cmd) to run a process.
25    pub cmd: Cmd<Loc>,
26    /// Amount of time to wait before killing hanged process. See [`KillTimeout`](crate::KillTimeout).
27    pub timeout: KillTimeout,
28}
29
30enum TeardownReason {
31    CtrlC,
32    ProcessFinished(io::Result<Output>),
33}
34
35enum CtrlCResult {
36    ProcessExited,
37    Timeout,
38}
39
40pub(crate) enum ExitResult {
41    Output(Output),
42    Interrupted,
43    Killed { pid: u32 },
44}
45
46impl<Loc> Process<Loc>
47where
48    Loc: Location,
49{
50    /// Constructs a new process.
51    pub fn new(tag: &'static str, cmd: Cmd<Loc>, timeout: KillTimeout) -> Self {
52        Self { tag, cmd, timeout }
53    }
54
55    /// Returns a tag of a process.
56    pub fn tag(&self) -> &'static str {
57        self.tag
58    }
59
60    /// Returns a command of a process.
61    pub fn cmd(&self) -> &Cmd<Loc> {
62        &self.cmd
63    }
64
65    /// Returns a timeout of a process.
66    pub fn timeout(&self) -> &KillTimeout {
67        &self.timeout
68    }
69
70    /// Spawns a process and returns a [`RunningProcess`](RunningProcess),
71    /// which includes a [`Child`](tokio::process::Child).
72    pub async fn spawn(&self, opts: SpawnOptions) -> io::Result<RunningProcess> {
73        self.cmd().spawn(opts)
74    }
75}
76
77/// Convenience macro for creating a [`Process`](Process).
78///
79/// ## Examples
80/// Constructing a process with the default [`KillTimeout`](crate::KillTimeout):
81/// ```ignore
82/// process! {
83///   tag: "server",
84///   cmd: cmd! { ... },
85/// }
86/// ```
87///
88/// Constructing a process with the specific timeout:
89/// ```ignore
90/// use std::time::Duration;
91///
92/// process! {
93///   tag: "server",
94///   cmd: cmd! { ... },
95///   timeout: Duration::from_secs(20).into(),
96/// }
97/// ```
98#[macro_export]
99macro_rules! process {
100    {
101        tag: $tag:expr,
102        cmd: $cmd:expr,
103        timeout: $timeout:expr$(,)?
104    } => {
105        $crate::Process::new(
106            $tag,
107            $cmd,
108            $timeout,
109        )
110    };
111    {
112        tag: $tag:expr,
113        cmd: $cmd:expr$(,)?
114    } => {
115        $crate::Process::new(
116            $tag,
117            $cmd,
118            $crate::KillTimeout::default(),
119        )
120    };
121}
122
123/// Wrapper around a running child process.
124pub struct RunningProcess {
125    pub(crate) process: Child,
126    pub(crate) timeout: KillTimeout,
127}
128
129impl RunningProcess {
130    /// Returns a reference to the underlying [`Child`](tokio::process::Child) process.
131    pub fn as_child(&self) -> &Child {
132        &self.process
133    }
134
135    /// Consumes the instance and gives a handle to the underlying [`Child`](tokio::process::Child) process.
136    pub fn into_child(self) -> Child {
137        self.process
138    }
139
140    pub(crate) fn stdout(&mut self) -> Option<ChildStdout> {
141        self.process.stdout.take()
142    }
143
144    pub(crate) fn stderr(&mut self) -> Option<ChildStderr> {
145        self.process.stderr.take()
146    }
147
148    pub(crate) async fn wait(self) -> Result<ExitResult> {
149        let process = self.process;
150
151        let pid = match process.id() {
152            Some(pid) => pid,
153            None => return Err(Error::ProcessDoesNotExist),
154        };
155
156        let process_exited = Arc::new(AtomicBool::new(false));
157
158        let exit_reason = {
159            let process_exited = process_exited.clone();
160
161            let process_task = task::spawn(async move {
162                let res = process.wait_with_output().await;
163                process_exited.store(true, Ordering::SeqCst);
164                res
165            });
166
167            tokio::select! {
168                result =
169                  process_task =>
170                    TeardownReason::ProcessFinished(
171                      result.unwrap_or_else(|err| Err(io::Error::new(io::ErrorKind::Other, err)))
172                    ),
173                _ = signal::ctrl_c() => TeardownReason::CtrlC,
174            }
175        };
176
177        match exit_reason {
178            TeardownReason::ProcessFinished(result) => {
179                let output = result?;
180                if output.status.success() {
181                    Ok(ExitResult::Output(output))
182                } else {
183                    Err(output.into())
184                }
185            }
186            TeardownReason::CtrlC => {
187                let res = {
188                    let process_exited = process_exited.clone();
189                    let exit_checker = task::spawn(async move {
190                        loop {
191                            if process_exited.load(Ordering::SeqCst) {
192                                break;
193                            }
194                        }
195                    });
196                    tokio::select! {
197                        _ = exit_checker => CtrlCResult::ProcessExited,
198                        _ = time::sleep(*self.timeout) => CtrlCResult::Timeout,
199                    }
200                };
201
202                match res {
203                    CtrlCResult::ProcessExited => Ok(ExitResult::Interrupted),
204                    CtrlCResult::Timeout => match Self::kill(pid) {
205                        Ok(()) => Ok(ExitResult::Killed { pid }),
206                        Err(err) => Err(err),
207                    },
208                }
209            }
210        }
211    }
212
213    /// Tries to safely terminate a running process. If the termination didn't succeed, tries to kill it.
214    #[cfg(unix)]
215    pub async fn stop(mut self) -> Result<()> {
216        use nix::{
217            sys::signal::{self, Signal},
218            unistd::Pid,
219        };
220
221        match self.process.id() {
222            None => Err(Error::ProcessDoesNotExist),
223            Some(pid) => match signal::kill(Pid::from_raw(pid as i32), Signal::SIGINT) {
224                Ok(()) => {
225                    let process = &mut self.process;
226
227                    let res = tokio::select! {
228                        res = process.wait() => Some(res),
229                        _ = time::sleep(*self.timeout) => None,
230                    };
231
232                    match res {
233                        Some(Ok(_)) => Ok(()),
234                        Some(Err(error)) => {
235                            eprintln!("⚠️ IO error on SIGINT: {error}. Killing the process {pid}.");
236                            Self::kill(pid)
237                        }
238                        None => {
239                            eprintln!("⚠️ SIGINT timeout. Killing the process {pid}.");
240                            Self::kill(pid)
241                        }
242                    }
243                }
244                Err(error) => {
245                    eprintln!("⚠️ Failed to terminate the process {pid}. {error}. Killing it.");
246                    Self::kill(pid)
247                }
248            },
249        }
250    }
251
252    // TODO: Implemetn RunningProcess::stop for windows
253
254    #[cfg(unix)]
255    pub(crate) fn kill(pid: u32) -> Result<()> {
256        use nix::{
257            sys::signal::{self, Signal},
258            unistd::Pid,
259        };
260
261        signal::kill(Pid::from_raw(pid as i32), Signal::SIGKILL)
262            .map_err(|err| Error::Zombie { pid, err })
263    }
264
265    #[cfg(windows)]
266    pub(crate) fn kill(pid: u32) -> Result<()> {
267        use winapi::{
268            shared::{
269                minwindef::{BOOL, DWORD, FALSE, UINT},
270                ntdef::NULL,
271            },
272            um::{
273                errhandlingapi::GetLastError,
274                handleapi::CloseHandle,
275                processthreadsapi::{OpenProcess, TerminateProcess},
276                winnt::{HANDLE, PROCESS_TERMINATE},
277            },
278        };
279
280        // since we only wish to kill the process
281        const DESIRED_ACCESS: DWORD = PROCESS_TERMINATE;
282
283        const INHERIT_HANDLE: BOOL = FALSE;
284
285        // for some reason windows doesn't have any exit codes,
286        // you just use what ever you want?
287        //
288        // so we're using exit code `0` then
289        const EXIT_CODE: UINT = 0;
290
291        // windows being window you have to call this a lot
292        // so i just extracted it to its own function
293        unsafe fn get_error(pid: u32) -> Result<()> {
294            // https://docs.microsoft.com/en-us/windows/win32/api/errhandlingapi/nf-errhandlingapi-getlasterror
295            let err: DWORD = GetLastError();
296
297            Err(Error::Zombie { pid, err })
298        }
299
300        unsafe {
301            // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-openprocess
302            let handle: HANDLE = OpenProcess(DESIRED_ACCESS, INHERIT_HANDLE, pid);
303            if handle == NULL {
304                get_error(pid)?;
305            }
306
307            // https://docs.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-terminateprocess
308            let terminate_result: BOOL = TerminateProcess(handle, EXIT_CODE);
309            if terminate_result == FALSE {
310                get_error(pid)?;
311            }
312
313            // https://docs.microsoft.com/en-us/windows/win32/api/handleapi/nf-handleapi-closehandle
314            let close_result: BOOL = CloseHandle(handle);
315            if close_result == FALSE {
316                get_error(pid)?;
317            }
318        }
319
320        Ok(())
321    }
322}
323
324/// Entry of a [`ProcessPool`](ProcessPool) when some of the processes depend on something.
325/// It is used as an input to the [`ProcessPool::run_with_deps`](ProcessPool::run_with_deps) method.
326/// See [`dep`](crate::dep) module documentation.
327pub enum PoolEntry<Loc, Dep: ?Sized> {
328    /// An indipendent long-running process.
329    Process(Process<Loc>),
330    /// A long-running process that depends on some other thing.
331    ProcessWithDep {
332        /// The process.
333        process: Process<Loc>,
334        /// The dependency. See [`Dependency`](Dependency).
335        dependency: Box<Dep>,
336    },
337}
338
339impl<Loc> PoolEntry<Loc, dyn Dependency>
340where
341    Loc: Location + 'static,
342{
343    fn process(&self) -> &Process<Loc> {
344        match self {
345            Self::Process(process) => process,
346            Self::ProcessWithDep {
347                process,
348                dependency: _,
349            } => process,
350        }
351    }
352
353    fn take(self) -> (Process<Loc>, Option<Box<dyn Dependency>>) {
354        match self {
355            Self::Process(process) => (process, None),
356            Self::ProcessWithDep {
357                process,
358                dependency,
359            } => (process, Some(dependency)),
360        }
361    }
362}
363
364/// Struct to run a pool of long-running processes.
365///
366/// ```ignore
367/// ProcessPool::run(vec![process_1, process_2]).await
368/// ```
369pub struct ProcessPool;
370
371impl ProcessPool {
372    /// Runs a pool of long-running processes.
373    pub async fn run<Loc>(pool: Vec<Process<Loc>>) -> Result<()>
374    where
375        Loc: Location + 'static,
376    {
377        let pool = pool.into_iter().map(|p| PoolEntry::Process(p)).collect();
378        ProcessPool::runner::<Loc>(pool).await
379    }
380
381    /// Runs a pool of long-running processes, some of which depend on something,
382    /// such as an HTTP service being available or a file existing.
383    /// See [`dep`](crate::dep) module documentation.
384    pub async fn run_with_deps<Loc>(pool: Vec<PoolEntry<Loc, dyn Dependency>>) -> Result<()>
385    where
386        Loc: Location + 'static,
387    {
388        ProcessPool::runner(pool).await
389    }
390
391    async fn runner<Loc>(pool: Vec<PoolEntry<Loc, dyn Dependency>>) -> Result<()>
392    where
393        Loc: Location + 'static,
394    {
395        let pool_size = pool.len();
396        let exited_processes = Arc::new(AtomicUsize::new(0));
397
398        let (tag_col_length, timeout) =
399            pool.iter()
400                .fold((0, Duration::default()), |(len, timeout), entry| {
401                    let process = entry.process();
402                    let len = {
403                        let tag_len = process.tag().len();
404                        if tag_len > len {
405                            tag_len
406                        } else {
407                            len
408                        }
409                    };
410                    let timeout = if *process.timeout > timeout {
411                        *process.timeout
412                    } else {
413                        timeout
414                    };
415                    (len, timeout)
416                });
417
418        let colors = colors::make(pool_size as u8);
419        let processes: Vec<(PoolEntry<Loc, dyn Dependency>, Color)> =
420            pool.into_iter().zip(colors).collect();
421
422        let processes_list = processes.iter().fold(String::new(), |acc, (entry, color)| {
423            let process = entry.process();
424            let styled = console::style(process.tag().to_string()).fg(*color).bold();
425            if acc.is_empty() {
426                styled.to_string()
427            } else {
428                format!("{}, {}", acc, styled)
429            }
430        });
431
432        eprintln!("❯ {} {}", console::style("Running:").bold(), processes_list);
433
434        for (entry, color) in processes {
435            let exited_processes = exited_processes.clone();
436
437            task::spawn(async move {
438                let (process, dependency) = entry.take();
439                let tag = process.tag();
440                let cmd = process.cmd();
441                let timeout = process.timeout();
442                let colored_tag = console::style(tag.to_owned()).fg(color).bold();
443                let colored_tag_col = {
444                    let len = tag.len();
445                    let pad = " ".repeat(if len < tag_col_length {
446                        tag_col_length - len + 2
447                    } else {
448                        2
449                    });
450                    console::style(format!(
451                        "{tag}{pad}{pipe}",
452                        tag = colored_tag,
453                        pad = pad,
454                        pipe = console::style("|").fg(color).bold()
455                    ))
456                };
457
458                let dep_res = match dependency {
459                    None => Ok(()),
460                    Some(dependency) => {
461                        let dep_tag = console::style(dependency.tag()).bold();
462
463                        eprintln!(
464                            "{col} {process} is waiting for its {dep} dependency...",
465                            col = colored_tag_col,
466                            dep = dep_tag,
467                            process = colored_tag
468                        );
469
470                        let res = dependency.wait().await;
471                        if let Err(error) = &res {
472                            eprintln!(
473                                "{col} ❗️ {dep} dependency of {process} errored: {error}\nNot executing {process}.",
474                                col = colored_tag_col,
475                                dep = dep_tag,
476                                process = colored_tag,
477                                error = error
478                            );
479                        }
480                        res
481                    }
482                };
483
484                if let Ok(()) = dep_res {
485                    eprintln!(
486                        "{tag} {headline}",
487                        tag = colored_tag_col,
488                        headline = crate::headline!(cmd),
489                    );
490
491                    let opts = SpawnOptions {
492                        stdout: Stdio::piped(),
493                        stderr: Stdio::piped(),
494                        timeout: timeout.to_owned(),
495                    };
496
497                    let mut process = process.spawn(opts).await.unwrap_or_else(|err| {
498                        panic!("Failed to spawn {} process. {}", colored_tag, err)
499                    });
500
501                    match process.stdout() {
502                        None => eprintln!(
503                            "{} Unable to read from {} stdout",
504                            colored_tag_col, colored_tag
505                        ),
506                        Some(stdout) => {
507                            let mut reader = BufReader::new(stdout).lines();
508                            task::spawn({
509                                let tag = colored_tag_col.clone();
510                                async move {
511                                    while let Some(line) = reader.next_line().await.unwrap() {
512                                        eprintln!("{} {}", tag, line);
513                                    }
514                                }
515                            });
516                        }
517                    }
518
519                    match process.stderr() {
520                        None => eprintln!(
521                            "{} Unable to read from {} stderr",
522                            colored_tag_col, colored_tag
523                        ),
524                        Some(stderr) => {
525                            let mut reader = BufReader::new(stderr).lines();
526                            task::spawn({
527                                let tag = colored_tag_col.clone();
528                                async move {
529                                    while let Some(line) = reader.next_line().await.unwrap() {
530                                        eprintln!("{} {}", tag, line);
531                                    }
532                                }
533                            });
534                        }
535                    }
536
537                    let res = process.wait().await;
538
539                    match res {
540                        Ok(ExitResult::Output(_)) => eprintln!(
541                            "{} Process {} exited with code 0.",
542                            colored_tag_col, colored_tag
543                        ),
544                         Ok(ExitResult::Interrupted) => eprintln!(
545                            "{} Process {} successfully exited.",
546                            colored_tag_col, colored_tag
547                        ),
548                        Ok(ExitResult::Killed { pid }) => eprintln!(
549                            "{} Process {} with pid {pid} was killed due to timeout.",
550                            colored_tag_col,
551                            colored_tag,
552                        ),
553                        Err(Error::NonZeroExitCode { code, output: _ }) => eprintln!(
554                            "{} Process {} exited with non-zero code: {}",
555                            colored_tag_col,
556                            colored_tag,
557                            code.map(|x| format!("{}", x)).unwrap_or_else(|| "-".to_string())
558                        ),
559                        Err(Error::ProcessDoesNotExist) => eprintln!(
560                            "{} ⚠️  Process {} does not exist.",
561                            colored_tag_col, colored_tag
562                        ),
563                        Err(Error::Zombie { pid, err }) => eprintln!(
564                            "{} ⚠️  Process {} with pid {} hanged and we were unable to kill it. Error: {}",
565                            colored_tag_col, colored_tag, pid, err
566                        ),
567                        Err(Error::IoError(err)) => eprintln!(
568                            "{} Process {} exited with error: {}",
569                            colored_tag_col, colored_tag, err
570                        ),
571                    }
572                }
573
574                exited_processes.fetch_add(1, Ordering::Relaxed);
575            });
576        }
577
578        signal::ctrl_c().await.unwrap();
579        eprintln!(); // Prints `^C` in terminal on its own line
580
581        let expire = Instant::now() + timeout;
582        while exited_processes.load(Ordering::Relaxed) < pool_size {
583            if Instant::now() > expire {
584                eprintln!("⚠️  Timeout. Exiting.");
585                break;
586            }
587            time::sleep(Duration::from_millis(500)).await;
588        }
589
590        Ok(())
591    }
592}
593
594mod colors {
595    use console::Color;
596    use rand::{seq::SliceRandom, thread_rng};
597
598    pub fn make(n: u8) -> Vec<Color> {
599        // Preferred colors
600        let mut primaries = vec![
601            // Color::Red, // Red is for errors
602            Color::Green,
603            Color::Yellow,
604            Color::Blue,
605            Color::Magenta,
606            Color::Cyan,
607        ];
608        // Not as good as primaries, but good enough to distinct processes
609        let secondaries = vec![
610            Color::Color256(24),
611            Color::Color256(172),
612            Color::Color256(142),
613        ];
614
615        // Let's check first if we can get away with just primary colors
616        if n <= primaries.len() as u8 {
617            shuffle(primaries, n)
618        }
619        // Otherwise, let's check if primary + secondary combined would work
620        else if n <= (primaries.len() + primaries.len()) as u8 {
621            primaries.extend(secondaries);
622            shuffle(primaries, n)
623        } else {
624            // TODO: Duplicate primary + secondary colors vec as many is needed, then shuffle
625            todo!()
626        }
627    }
628
629    fn shuffle<T>(mut items: Vec<T>, n: u8) -> Vec<T> {
630        items.truncate(n as usize);
631        items.shuffle(&mut thread_rng());
632        items
633    }
634}
635
636#[cfg(test)]
637mod tests {
638    use std::time::Duration;
639
640    use crate::{Cmd, Location, Process};
641
642    #[allow(dead_code)]
643    fn process_macro_with_timeout<Loc: Location>(cmd: Cmd<Loc>) -> Process<Loc> {
644        process! {
645          tag: "server",
646          cmd: cmd,
647          timeout: Duration::from_secs(20).into(),
648        }
649    }
650
651    #[allow(dead_code)]
652    fn process_macro_without_timeout<Loc: Location>(cmd: Cmd<Loc>) -> Process<Loc> {
653        process! {
654          tag: "server",
655          cmd: cmd,
656        }
657    }
658}