process_muxer_core/muxer/
mod.rs1mod process;
2pub(crate) mod source;
3pub use process::pid::Pid;
4use source::termination::ChildTerminationSource;
5use std::{
6 cell::Cell,
7 collections::BTreeMap,
8 io::{self, BufRead, ErrorKind},
9 mem,
10 path::{Path, PathBuf},
11 process::{Child, ChildStdin, Command, ExitStatus},
12 rc::Rc,
13};
14
15use mio::{
16 event::{self, Source},
17 Events, Interest, Poll, Token,
18};
19use slab::Slab;
20
21pub use self::source::childout::FdTag;
22use self::source::{childout::ChildOut, SourceInstruction};
23
24#[cfg(feature = "signals")]
25use self::source::EventStream;
26#[cfg(feature = "signals")]
27use source::signal::{Signal, SignalSource};
28
29pub struct ChildInfo {
31 pub pid: Pid,
32 pub stdin: Option<ChildStdin>,
33 prog_path: Rc<PathBuf>,
34 exit_status: Rc<Cell<Option<ExitStatus>>>,
35}
36
37impl ChildInfo {
38 pub fn program(&self) -> &Path {
39 &self.prog_path
40 }
41
42 pub fn exit_status(&self) -> Option<ExitStatus> {
43 self.exit_status.get()
44 }
45}
46
47#[derive(Debug)]
49pub enum Event<'a> {
50 ChildTerminated {
51 pid: Pid,
52 prog_path: &'a Path,
53 exit_status: ExitStatus,
54 },
55 ChildWrote {
56 pid: Pid,
57 prog_path: &'a Path,
58 tag: FdTag,
59 line: &'a str,
60 },
61 FdClosed {
62 pid: Pid,
63 prog_path: &'a Path,
64 tag: FdTag,
65 },
66 #[cfg(feature = "signals")]
67 SignalReceived { signal: Signal },
68}
69
70pub struct Muxer {
72 poll: Poll,
73 events: Events,
74 children: BTreeMap<Pid, MuxerChild>,
75 fds: Slab<EventSource>,
76 state: State,
77 wait_buffer: Vec<(Pid, Rc<PathBuf>, ExitStatus)>,
78 pending_events: Vec<event::Event>,
81}
82
83impl Muxer {
84 pub fn new() -> io::Result<Self> {
85 let mut res = Self {
86 poll: Poll::new()?,
87 wait_buffer: Vec::new(),
88 events: Events::with_capacity(1024),
89 children: BTreeMap::new(),
90 fds: Slab::new(),
91 state: State::Awaiting,
92 pending_events: Vec::new(),
93 };
94
95 let wait_source = ChildTerminationSource::new()?;
96 res.register(EventSource::ChildTerminated(wait_source));
97 #[cfg(feature = "signals")]
98 {
99 let signal_source = SignalSource::new()?;
100 res.register(EventSource::ReceivedSignal(signal_source));
101 }
102 Ok(res)
103 }
104
105 pub fn pids(&self) -> impl Iterator<Item = &Pid> {
106 self.children.keys()
107 }
108
109 pub fn spawn(&mut self, mut cmd: Command) -> io::Result<ChildInfo> {
110 let prog_path = PathBuf::from(cmd.get_program());
111
112 let mut child = cmd.spawn()?;
113 let pid = Pid { inner: child.id() };
114 let registry = self.poll.registry();
115 let prog_path = Rc::new(prog_path);
116
117 let child_info = ChildInfo {
118 pid,
119 stdin: child.stdin.take(),
120 prog_path: prog_path.clone(),
121 exit_status: Rc::new(Cell::new(None)),
122 };
123
124 if let Some(stdout) = child.stdout.take() {
125 let prog_path = prog_path.clone();
126 let mut stdout = ChildOut::from_pipe(stdout, pid, prog_path);
127 let entry = self.fds.vacant_entry();
128 registry.register(&mut stdout, Token(entry.key()), Interest::READABLE)?;
129 entry.insert(EventSource::ReadableChild(stdout));
130 }
131
132 if let Some(stderr) = child.stderr.take() {
133 let prog_path = prog_path.clone();
134 let mut stderr = ChildOut::from_pipe(stderr, pid, prog_path);
135 let entry = self.fds.vacant_entry();
136 registry.register(&mut stderr, Token(entry.key()), Interest::READABLE)?;
137 entry.insert(EventSource::ReadableChild(stderr));
138 }
139
140 let muxer_child = MuxerChild {
141 child,
142 prog_path: prog_path.clone(),
143 exit_status: child_info.exit_status.clone(),
144 };
145
146 self.children.insert(pid, muxer_child);
147 Ok(child_info)
148 }
149
150 fn register(&mut self, mut evsrc: EventSource) {
151 let entry = self.fds.vacant_entry();
152 evsrc
153 .register(self.poll.registry(), Token(entry.key()), Interest::READABLE)
154 .unwrap();
155 entry.insert(evsrc);
156 }
157
158 fn reregister(&mut self, mut evsrc: EventSource) {
159 let entry = self.fds.vacant_entry();
160 evsrc
161 .reregister(self.poll.registry(), Token(entry.key()), Interest::READABLE)
162 .unwrap();
163 entry.insert(evsrc);
164 }
165
166 fn deregister(&mut self, mut evsrc: EventSource) {
167 evsrc.deregister(self.poll.registry()).unwrap();
168 }
169
170 pub fn pump<R, F>(&mut self, mut func: F) -> R
171 where
172 F: FnMut(Event) -> Option<R>,
173 {
174 let mut state = mem::replace(&mut self.state, State::Awaiting);
175 let (state, event) = loop {
176 match state {
177 State::Awaiting => match self.pending_events.pop() {
178 None => {
179 loop {
181 match self.poll.poll(&mut self.events, None) {
182 Ok(()) => break,
183 Err(e) => match e.kind() {
184 ErrorKind::Interrupted => continue,
187 _ => panic!("Unexpected error during poll: {e}"),
188 },
189 }
190 }
191 self.pending_events.extend(self.events.iter().cloned());
194
195 self.events.clear();
196 }
197 Some(ev) => match self.fds.remove(ev.token().0) {
205 EventSource::ChildTerminated(mut w) => {
206 match w.handle_event(&mut self.children, &mut self.wait_buffer) {
207 SourceInstruction::Reregister => {
208 self.reregister(EventSource::ChildTerminated(w));
209 state = State::DrainingChildTerminated;
210 }
211 SourceInstruction::Deregister => {
212 self.deregister(EventSource::ChildTerminated(w));
213 }
214 }
215 }
216 EventSource::ReadableChild(child_out) => {
217 state = State::DrainingChildOut(child_out);
218 }
219 #[cfg(feature = "signals")]
220 EventSource::ReceivedSignal(signal_source) => {
221 state = State::DrainingSignals(signal_source);
222 }
223 },
224 },
225 State::DrainingChildTerminated => match self.wait_buffer.pop() {
226 None => state = State::Awaiting,
227 Some((pid, prog_path, exit_status)) => {
228 let event = Event::ChildTerminated {
229 pid,
230 prog_path: &prog_path,
231 exit_status,
232 };
233 match func(event) {
234 None => state = State::DrainingChildTerminated,
235 Some(r) => {
236 break (State::DrainingChildTerminated, r);
237 }
238 }
239 }
240 },
241 State::DrainingChildOut(mut child_out) => {
242 let fd = &mut child_out.fd;
243 let buf: &mut String = &mut child_out.buf;
244 match fd.read_line(buf) {
245 Ok(0) => {
246 child_out
249 .fd
250 .get_mut()
251 .deregister(self.poll.registry())
252 .unwrap();
253 let event = Event::FdClosed {
254 pid: child_out.pid,
255 tag: child_out.tag,
256 prog_path: &child_out.prog_path,
257 };
258 match func(event) {
259 None => state = State::Awaiting,
260 Some(r) => {
261 break (State::Awaiting, r);
262 }
263 }
264 }
265 Ok(_) => {
266 let event = Event::ChildWrote {
267 pid: child_out.pid,
268 tag: child_out.tag,
269 prog_path: &child_out.prog_path,
270 line: buf,
271 };
272 let ores = func(event);
273 buf.clear();
274 match ores {
275 None => state = State::DrainingChildOut(child_out),
276 Some(r) => {
277 break (State::DrainingChildOut(child_out), r);
278 }
279 }
280 }
281 Err(e) if e.kind() == ErrorKind::Interrupted => {
284 state = State::DrainingChildOut(child_out)
285 }
286 Err(e) if e.kind() == ErrorKind::WouldBlock => {
287 self.reregister(EventSource::ReadableChild(child_out));
288 state = State::Awaiting;
289 }
290 Err(e) => panic!("Unexpected error when reading child output: {e}"),
291 }
292 }
293 #[cfg(feature = "signals")]
294 State::DrainingSignals(mut signal_source) => match signal_source.next() {
295 EventStream::Emit(signal) => {
296 let event = Event::SignalReceived { signal };
297 match func(event) {
298 Some(r) => {
299 break (State::DrainingSignals(signal_source), r);
300 }
301 None => state = State::DrainingSignals(signal_source),
302 }
303 }
304 EventStream::Drained(source_instruction) => {
305 let event_source = EventSource::ReceivedSignal(signal_source);
306 match source_instruction {
307 SourceInstruction::Reregister => self.reregister(event_source),
308 SourceInstruction::Deregister => self.deregister(event_source),
309 }
310 state = State::Awaiting;
311 }
312 },
313 }
314 };
315 self.state = state;
316 event
317 }
318}
319
320enum EventSource {
321 ReadableChild(ChildOut),
322 ChildTerminated(ChildTerminationSource),
323 #[cfg(feature = "signals")]
324 ReceivedSignal(SignalSource),
325}
326
327impl Source for EventSource {
328 fn register(
329 &mut self,
330 registry: &mio::Registry,
331 token: Token,
332 interests: Interest,
333 ) -> io::Result<()> {
334 match self {
335 EventSource::ReadableChild(x) => x.register(registry, token, interests),
336 EventSource::ChildTerminated(x) => x.register(registry, token, interests),
337 #[cfg(feature = "signals")]
338 EventSource::ReceivedSignal(x) => x.register(registry, token, interests),
339 }
340 }
341
342 fn reregister(
343 &mut self,
344 registry: &mio::Registry,
345 token: Token,
346 interests: Interest,
347 ) -> io::Result<()> {
348 match self {
349 EventSource::ReadableChild(x) => x.reregister(registry, token, interests),
350 EventSource::ChildTerminated(x) => x.reregister(registry, token, interests),
351 #[cfg(feature = "signals")]
352 EventSource::ReceivedSignal(x) => x.reregister(registry, token, interests),
353 }
354 }
355
356 fn deregister(&mut self, registry: &mio::Registry) -> io::Result<()> {
357 match self {
358 EventSource::ReadableChild(x) => x.deregister(registry),
359 EventSource::ChildTerminated(x) => x.deregister(registry),
360 #[cfg(feature = "signals")]
361 EventSource::ReceivedSignal(x) => x.deregister(registry),
362 }
363 }
364}
365
366#[derive(Debug)]
367enum State {
368 Awaiting,
369 DrainingChildOut(ChildOut),
370 DrainingChildTerminated,
371 #[cfg(feature = "signals")]
372 DrainingSignals(SignalSource),
373}
374
375pub struct MuxerChild {
376 child: Child,
377 prog_path: Rc<PathBuf>,
378 exit_status: Rc<Cell<Option<ExitStatus>>>,
379}