process_muxer_core/muxer/
mod.rs

1mod process;
2pub(crate) mod source;
3pub use process::pid::Pid;
4use source::termination::ChildTerminationSource;
5use std::{
6    cell::Cell,
7    collections::BTreeMap,
8    io::{self, BufRead, ErrorKind},
9    mem,
10    path::{Path, PathBuf},
11    process::{Child, ChildStdin, Command, ExitStatus},
12    rc::Rc,
13};
14
15use mio::{
16    event::{self, Source},
17    Events, Interest, Poll, Token,
18};
19use slab::Slab;
20
21pub use self::source::childout::FdTag;
22use self::source::{childout::ChildOut, SourceInstruction};
23
24#[cfg(feature = "signals")]
25use self::source::EventStream;
26#[cfg(feature = "signals")]
27use source::signal::{Signal, SignalSource};
28
29/// A handle to a child process that was spawned with `Muxer`.
30pub struct ChildInfo {
31    pub pid: Pid,
32    pub stdin: Option<ChildStdin>,
33    prog_path: Rc<PathBuf>,
34    exit_status: Rc<Cell<Option<ExitStatus>>>,
35}
36
37impl ChildInfo {
38    pub fn program(&self) -> &Path {
39        &self.prog_path
40    }
41
42    pub fn exit_status(&self) -> Option<ExitStatus> {
43        self.exit_status.get()
44    }
45}
46
47/// A user-facing event emitted by the `Muxer`
48#[derive(Debug)]
49pub enum Event<'a> {
50    ChildTerminated {
51        pid: Pid,
52        prog_path: &'a Path,
53        exit_status: ExitStatus,
54    },
55    ChildWrote {
56        pid: Pid,
57        prog_path: &'a Path,
58        tag: FdTag,
59        line: &'a str,
60    },
61    FdClosed {
62        pid: Pid,
63        prog_path: &'a Path,
64        tag: FdTag,
65    },
66    #[cfg(feature = "signals")]
67    SignalReceived { signal: Signal },
68}
69
70/// A process Muxer
71pub struct Muxer {
72    poll: Poll,
73    events: Events,
74    children: BTreeMap<Pid, MuxerChild>,
75    fds: Slab<EventSource>,
76    state: State,
77    wait_buffer: Vec<(Pid, Rc<PathBuf>, ExitStatus)>,
78    // We don't need this field, an index into "events" would do, but the Events
79    // type only exposes an iterator over references
80    pending_events: Vec<event::Event>,
81}
82
83impl Muxer {
84    pub fn new() -> io::Result<Self> {
85        let mut res = Self {
86            poll: Poll::new()?,
87            wait_buffer: Vec::new(),
88            events: Events::with_capacity(1024),
89            children: BTreeMap::new(),
90            fds: Slab::new(),
91            state: State::Awaiting,
92            pending_events: Vec::new(),
93        };
94
95        let wait_source = ChildTerminationSource::new()?;
96        res.register(EventSource::ChildTerminated(wait_source));
97        #[cfg(feature = "signals")]
98        {
99            let signal_source = SignalSource::new()?;
100            res.register(EventSource::ReceivedSignal(signal_source));
101        }
102        Ok(res)
103    }
104
105    pub fn pids(&self) -> impl Iterator<Item = &Pid> {
106        self.children.keys()
107    }
108
109    pub fn spawn(&mut self, mut cmd: Command) -> io::Result<ChildInfo> {
110        let prog_path = PathBuf::from(cmd.get_program());
111
112        let mut child = cmd.spawn()?;
113        let pid = Pid { inner: child.id() };
114        let registry = self.poll.registry();
115        let prog_path = Rc::new(prog_path);
116
117        let child_info = ChildInfo {
118            pid,
119            stdin: child.stdin.take(),
120            prog_path: prog_path.clone(),
121            exit_status: Rc::new(Cell::new(None)),
122        };
123
124        if let Some(stdout) = child.stdout.take() {
125            let prog_path = prog_path.clone();
126            let mut stdout = ChildOut::from_pipe(stdout, pid, prog_path);
127            let entry = self.fds.vacant_entry();
128            registry.register(&mut stdout, Token(entry.key()), Interest::READABLE)?;
129            entry.insert(EventSource::ReadableChild(stdout));
130        }
131
132        if let Some(stderr) = child.stderr.take() {
133            let prog_path = prog_path.clone();
134            let mut stderr = ChildOut::from_pipe(stderr, pid, prog_path);
135            let entry = self.fds.vacant_entry();
136            registry.register(&mut stderr, Token(entry.key()), Interest::READABLE)?;
137            entry.insert(EventSource::ReadableChild(stderr));
138        }
139
140        let muxer_child = MuxerChild {
141            child,
142            prog_path: prog_path.clone(),
143            exit_status: child_info.exit_status.clone(),
144        };
145
146        self.children.insert(pid, muxer_child);
147        Ok(child_info)
148    }
149
150    fn register(&mut self, mut evsrc: EventSource) {
151        let entry = self.fds.vacant_entry();
152        evsrc
153            .register(self.poll.registry(), Token(entry.key()), Interest::READABLE)
154            .unwrap();
155        entry.insert(evsrc);
156    }
157
158    fn reregister(&mut self, mut evsrc: EventSource) {
159        let entry = self.fds.vacant_entry();
160        evsrc
161            .reregister(self.poll.registry(), Token(entry.key()), Interest::READABLE)
162            .unwrap();
163        entry.insert(evsrc);
164    }
165
166    fn deregister(&mut self, mut evsrc: EventSource) {
167        evsrc.deregister(self.poll.registry()).unwrap();
168    }
169
170    pub fn pump<R, F>(&mut self, mut func: F) -> R
171    where
172        F: FnMut(Event) -> Option<R>,
173    {
174        let mut state = mem::replace(&mut self.state, State::Awaiting);
175        let (state, event) = loop {
176            match state {
177                State::Awaiting => match self.pending_events.pop() {
178                    None => {
179                        // fill our events buffer
180                        loop {
181                            match self.poll.poll(&mut self.events, None) {
182                                Ok(()) => break,
183                                Err(e) => match e.kind() {
184                                    // if our poll is interrupted by a
185                                    // system call then retry
186                                    ErrorKind::Interrupted => continue,
187                                    _ => panic!("Unexpected error during poll: {e}"),
188                                },
189                            }
190                        }
191                        // Since events buffer is opaque, we cannot suspend our iteration through it
192                        // easily. So, we copy the contents to a vector that we can pop from.
193                        self.pending_events.extend(self.events.iter().cloned());
194
195                        self.events.clear();
196                    }
197                    // We have some event to handle. In these cases we
198                    // potentially have many events to handle before
199                    // reregistering the handle, but pump doesn't assume we are
200                    // prepared to handle them all before returning, so we
201                    // transition the state from Awaiting to a resource specific
202                    // state representing draining all pending events of some
203                    // type before reregistering the underlying fd.
204                    Some(ev) => match self.fds.remove(ev.token().0) {
205                        EventSource::ChildTerminated(mut w) => {
206                            match w.handle_event(&mut self.children, &mut self.wait_buffer) {
207                                SourceInstruction::Reregister => {
208                                    self.reregister(EventSource::ChildTerminated(w));
209                                    state = State::DrainingChildTerminated;
210                                }
211                                SourceInstruction::Deregister => {
212                                    self.deregister(EventSource::ChildTerminated(w));
213                                }
214                            }
215                        }
216                        EventSource::ReadableChild(child_out) => {
217                            state = State::DrainingChildOut(child_out);
218                        }
219                        #[cfg(feature = "signals")]
220                        EventSource::ReceivedSignal(signal_source) => {
221                            state = State::DrainingSignals(signal_source);
222                        }
223                    },
224                },
225                State::DrainingChildTerminated => match self.wait_buffer.pop() {
226                    None => state = State::Awaiting,
227                    Some((pid, prog_path, exit_status)) => {
228                        let event = Event::ChildTerminated {
229                            pid,
230                            prog_path: &prog_path,
231                            exit_status,
232                        };
233                        match func(event) {
234                            None => state = State::DrainingChildTerminated,
235                            Some(r) => {
236                                break (State::DrainingChildTerminated, r);
237                            }
238                        }
239                    }
240                },
241                State::DrainingChildOut(mut child_out) => {
242                    let fd = &mut child_out.fd;
243                    let buf: &mut String = &mut child_out.buf;
244                    match fd.read_line(buf) {
245                        Ok(0) => {
246                            // The fd was closed; we must deregister the fd and
247                            // return to the awaiting state.
248                            child_out
249                                .fd
250                                .get_mut()
251                                .deregister(self.poll.registry())
252                                .unwrap();
253                            let event = Event::FdClosed {
254                                pid: child_out.pid,
255                                tag: child_out.tag,
256                                prog_path: &child_out.prog_path,
257                            };
258                            match func(event) {
259                                None => state = State::Awaiting,
260                                Some(r) => {
261                                    break (State::Awaiting, r);
262                                }
263                            }
264                        }
265                        Ok(_) => {
266                            let event = Event::ChildWrote {
267                                pid: child_out.pid,
268                                tag: child_out.tag,
269                                prog_path: &child_out.prog_path,
270                                line: buf,
271                            };
272                            let ores = func(event);
273                            buf.clear();
274                            match ores {
275                                None => state = State::DrainingChildOut(child_out),
276                                Some(r) => {
277                                    break (State::DrainingChildOut(child_out), r);
278                                }
279                            }
280                        }
281                        // maybe we want to break in the future if we start
282                        // listening for SIGALRM
283                        Err(e) if e.kind() == ErrorKind::Interrupted => {
284                            state = State::DrainingChildOut(child_out)
285                        }
286                        Err(e) if e.kind() == ErrorKind::WouldBlock => {
287                            self.reregister(EventSource::ReadableChild(child_out));
288                            state = State::Awaiting;
289                        }
290                        Err(e) => panic!("Unexpected error when reading child output: {e}"),
291                    }
292                }
293                #[cfg(feature = "signals")]
294                State::DrainingSignals(mut signal_source) => match signal_source.next() {
295                    EventStream::Emit(signal) => {
296                        let event = Event::SignalReceived { signal };
297                        match func(event) {
298                            Some(r) => {
299                                break (State::DrainingSignals(signal_source), r);
300                            }
301                            None => state = State::DrainingSignals(signal_source),
302                        }
303                    }
304                    EventStream::Drained(source_instruction) => {
305                        let event_source = EventSource::ReceivedSignal(signal_source);
306                        match source_instruction {
307                            SourceInstruction::Reregister => self.reregister(event_source),
308                            SourceInstruction::Deregister => self.deregister(event_source),
309                        }
310                        state = State::Awaiting;
311                    }
312                },
313            }
314        };
315        self.state = state;
316        event
317    }
318}
319
320enum EventSource {
321    ReadableChild(ChildOut),
322    ChildTerminated(ChildTerminationSource),
323    #[cfg(feature = "signals")]
324    ReceivedSignal(SignalSource),
325}
326
327impl Source for EventSource {
328    fn register(
329        &mut self,
330        registry: &mio::Registry,
331        token: Token,
332        interests: Interest,
333    ) -> io::Result<()> {
334        match self {
335            EventSource::ReadableChild(x) => x.register(registry, token, interests),
336            EventSource::ChildTerminated(x) => x.register(registry, token, interests),
337            #[cfg(feature = "signals")]
338            EventSource::ReceivedSignal(x) => x.register(registry, token, interests),
339        }
340    }
341
342    fn reregister(
343        &mut self,
344        registry: &mio::Registry,
345        token: Token,
346        interests: Interest,
347    ) -> io::Result<()> {
348        match self {
349            EventSource::ReadableChild(x) => x.reregister(registry, token, interests),
350            EventSource::ChildTerminated(x) => x.reregister(registry, token, interests),
351            #[cfg(feature = "signals")]
352            EventSource::ReceivedSignal(x) => x.reregister(registry, token, interests),
353        }
354    }
355
356    fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
357        match self {
358            EventSource::ReadableChild(x) => x.deregister(registry),
359            EventSource::ChildTerminated(x) => x.deregister(registry),
360            #[cfg(feature = "signals")]
361            EventSource::ReceivedSignal(x) => x.deregister(registry),
362        }
363    }
364}
365
366#[derive(Debug)]
367enum State {
368    Awaiting,
369    DrainingChildOut(ChildOut),
370    DrainingChildTerminated,
371    #[cfg(feature = "signals")]
372    DrainingSignals(SignalSource),
373}
374
375pub struct MuxerChild {
376    child: Child,
377    prog_path: Rc<PathBuf>,
378    exit_status: Rc<Cell<Option<ExitStatus>>>,
379}