process_muxer/
lib.rs

1use std::{
2    io::{self, stderr, stdout, LineWriter, Write},
3    os::unix::process::CommandExt,
4    path::{Path, PathBuf},
5    process::{Command, ExitStatus, Stdio},
6};
7
8use console::Style;
9pub use process_muxer_core::{ChildInfo, Event, FdTag, Pid, Signal};
10use regex::Regex;
11
12pub trait MuxerHook {
13    fn before_event<'a>(&mut self, event: &Event<'a>);
14    fn before_spawn(&mut self, command: &Command);
15}
16
17pub struct Muxer {
18    inner: process_muxer_core::Muxer,
19    hooks: Vec<Box<dyn MuxerHook>>,
20}
21
22impl Muxer {
23    pub fn new() -> io::Result<Self> {
24        let res = Muxer {
25            inner: process_muxer_core::Muxer::new()?,
26            hooks: Vec::new(),
27        };
28        Ok(res)
29    }
30
31    pub fn add_hook<T: MuxerHook + 'static>(&mut self, hook: T) {
32        self.hooks.push(Box::new(hook));
33    }
34
35    pub fn pump<R, F>(&mut self, mut func: F) -> R
36    where
37        F: FnMut(Event) -> Option<R>,
38    {
39        self.inner.pump(|ev| {
40            for hook in self.hooks.iter_mut() {
41                hook.before_event(&ev);
42            }
43            func(ev)
44        })
45    }
46
47    /// Send SIGTERM to all children and wait for them to exit.
48    pub fn cleanup(&mut self) -> io::Result<()> {
49        let mut child_count = 0;
50        for pid in self.inner.pids() {
51            child_count += 1;
52            unsafe { libc::kill(pid.inner as i32, libc::SIGTERM) };
53        }
54
55        if child_count > 0 {
56            self.pump(|ev| {
57                if let Event::ChildTerminated { .. } = ev {
58                    child_count -= 1;
59                    if child_count == 0 {
60                        return Some(());
61                    }
62                }
63                None
64            });
65        }
66        Ok(())
67    }
68
69    /// Create a new child process that inherits stdin, stdout, and stderr and
70    /// is a member of the same process group.
71    pub fn control(&mut self, cmd: Command) -> io::Result<ChildInfo> {
72        let child = self.spawn(cmd)?;
73        Ok(child)
74    }
75
76    pub fn forward(&mut self, mut cmd: Command) -> io::Result<ChildInfo> {
77        cmd.stdout(Stdio::piped());
78        cmd.stderr(Stdio::piped());
79        cmd.process_group(0);
80        let child = self.spawn(cmd)?;
81        Ok(child)
82    }
83
84    fn spawn(&mut self, cmd: Command) -> io::Result<ChildInfo> {
85        for hook in self.hooks.iter_mut() {
86            hook.before_spawn(&cmd);
87        }
88
89        self.inner.spawn(cmd)
90    }
91
92    pub fn wait_for_signal(&mut self) -> Signal {
93        use Event::*;
94        self.pump(|ev| match ev {
95            SignalReceived { signal } => Some(signal),
96            _ => None,
97        })
98    }
99
100    pub fn wait_for_match(&mut self, child_info: &ChildInfo, re: Regex) -> Result<()> {
101        use Event::*;
102        if let Some(exit_status) = child_info.exit_status() {
103            return Err(Error::UnexpectedChildTermination {
104                pid: child_info.pid,
105                prog_path: PathBuf::from(child_info.program()),
106                exit_status,
107            });
108        }
109        self.pump(|ev| match ev {
110            ChildTerminated {
111                pid,
112                exit_status,
113                prog_path,
114            } if pid == child_info.pid => Some(Err(Error::UnexpectedChildTermination {
115                pid,
116                prog_path: PathBuf::from(prog_path),
117                exit_status,
118            })),
119            ChildWrote { pid, line, .. } if pid == child_info.pid && re.is_match(line) => {
120                Some(Ok(()))
121            }
122            // todo: watch for stdout and stderr closing. We need to know the
123            // initial state though.
124            FdClosed { .. } => None,
125            SignalReceived { signal } => Some(Err(Error::from(signal))),
126            _ => None,
127        })
128    }
129
130    pub fn wait(&mut self, child_info: &ChildInfo) -> Result<ExitStatus> {
131        use Event::*;
132        if let Some(exit_status) = child_info.exit_status() {
133            return Ok(exit_status);
134        }
135        self.pump(|ev| match ev {
136            ChildTerminated {
137                pid, exit_status, ..
138            } if pid == child_info.pid => Some(Ok(exit_status)),
139            SignalReceived { signal } => Some(Err(Error::from(signal))),
140            _ => None,
141        })
142    }
143}
144
145#[derive(Debug)]
146pub enum Error {
147    UnexpectedChildTermination {
148        pid: Pid,
149        prog_path: PathBuf,
150        exit_status: ExitStatus,
151    },
152    UnexpectedSignal {
153        signal: Signal,
154    },
155}
156
157pub type Result<A> = std::result::Result<A, Error>;
158
159impl From<Signal> for Error {
160    fn from(signal: Signal) -> Self {
161        Error::UnexpectedSignal { signal }
162    }
163}
164
165pub struct PrintInfo<Stdout: Write, Stderr: Write> {
166    pub stdout: Stdout,
167    pub stderr: Stderr,
168    pub info_style: Style,
169    pub stdout_style: Style,
170    pub stderr_style: Style,
171}
172
173impl PrintInfo<LineWriter<io::Stdout>, LineWriter<io::Stderr>> {
174    pub fn new() -> Self {
175        Self {
176            stdout: LineWriter::new(stdout()),
177            stderr: LineWriter::new(stderr()),
178            info_style: Style::new().green(),
179            stdout_style: Style::new().white(),
180            stderr_style: Style::new().white(),
181        }
182    }
183}
184
185impl<Stdout: Write, Stderr: Write> MuxerHook for PrintInfo<Stdout, Stderr> {
186    fn before_event<'a>(&mut self, ev: &Event<'a>) {
187        match ev {
188            Event::ChildTerminated {
189                prog_path,
190                exit_status,
191                ..
192            } => {
193                writeln!(
194                    &mut self.stdout,
195                    "{}{} {} {}{}",
196                    self.info_style.apply_to("["),
197                    self.info_style.apply_to(prog_path.display()),
198                    self.info_style.apply_to("terminated with"),
199                    self.info_style.apply_to(exit_status),
200                    self.info_style.apply_to("]"),
201                )
202                .unwrap();
203            }
204            Event::ChildWrote {
205                prog_path,
206                tag,
207                line,
208                ..
209            } => {
210                let forward_style = match tag {
211                    FdTag::Stdout => &self.stdout_style,
212                    FdTag::Stderr => &self.stderr_style,
213                };
214                let output: &mut dyn Write = match tag {
215                    FdTag::Stdout => &mut self.stdout,
216                    FdTag::Stderr => &mut self.stderr,
217                };
218                write!(
219                    output,
220                    "{}{}{} {}",
221                    self.info_style.apply_to("["),
222                    self.info_style.apply_to(&prog_path.display()),
223                    self.info_style.apply_to("]"),
224                    forward_style.apply_to(line),
225                )
226                .unwrap();
227            }
228            Event::FdClosed { prog_path, tag, .. } => {
229                let handle: &str = match tag {
230                    FdTag::Stderr => "stderr",
231                    FdTag::Stdout => "stdout",
232                };
233                writeln!(
234                    &mut self.stdout,
235                    "{}{} {} {}{}",
236                    self.info_style.apply_to("["),
237                    self.info_style.apply_to(prog_path.display()),
238                    self.info_style.apply_to("closed"),
239                    self.info_style.apply_to(handle),
240                    self.info_style.apply_to("]"),
241                )
242                .unwrap();
243            }
244            Event::SignalReceived { ref signal } => {
245                let signal = match signal {
246                    Signal::Hangup => "hangup (SIGHUP)",
247                    Signal::Interrupt => "interrupt (SIGINT)",
248                    Signal::Terminate => "terminate (SIGTERM)",
249                };
250                writeln!(
251                    &mut self.stdout,
252                    "{}{} {}{}",
253                    self.info_style.apply_to("["),
254                    self.info_style.apply_to("Received signal: "),
255                    self.info_style.apply_to(signal),
256                    self.info_style.apply_to("]"),
257                )
258                .unwrap();
259            }
260        }
261    }
262
263    fn before_spawn(&mut self, cmd: &Command) {
264        let prog_path: &Path = Path::new(cmd.get_program());
265        writeln!(
266            &mut self.stdout,
267            "{} {}{}",
268            self.info_style.apply_to("[Running"),
269            self.info_style.apply_to(prog_path.display()),
270            self.info_style.apply_to("]")
271        )
272        .unwrap();
273    }
274}