calloop_subproc/
listen.rs

1use super::{Command, CommandResult, ErrorEvent, LaunchError};
2use calloop::{transient::TransientSource, PostAction};
3use futures::{io::BufReader, AsyncBufReadExt, StreamExt};
4
5/// An event source that runs a subprocess and generates events for lines of
6/// output.
7///
8/// This event source will run a command as a subprocess, generate
9/// [`ListenEvent::Line`] events for every line it prints to stdout, and one
10/// final [`ListenEvent::End`] event when it ends.
11///
12/// Processes that run indefinitely can be killed (using `SIGKILL` ie. 9 on
13/// Linux) by returning `true` from the callback provided to `process_events()`
14/// or by calling the [`kill()`](Self::kill) method. Note that there may be more
15/// `Line` events generated after this, depending on the order in which the kill
16/// request and remaining output are processed.
17///
18/// After the subprocess has been ended and the final `End` event has been
19/// delivered, this event source should be removed from the event loop.
20pub struct SubprocListen {
21    /// The executor (event source) that actually runs the command and generates
22    /// events for us.
23    executor: calloop::futures::Executor<CommandResult>,
24
25    /// When the executor is finished, we need to stash its result until the
26    /// output is finished.
27    outcome: Option<Result<(), ErrorEvent>>,
28
29    /// Use a transient source because the sender can be dropped by the async
30    /// executor.
31    receiver: TransientSource<calloop::channel::Channel<ListenEvent>>,
32
33    /// Used to stop the subprocess before the end of the (potentially infinite)
34    /// stream of stdout.
35    stopper: Option<futures::channel::oneshot::Sender<()>>,
36
37    /// Set to true when all of the output has been received.
38    output_finished: bool,
39}
40
41impl SubprocListen {
42    /// Takes a command and schedules a subprocess to be run asychronously.
43    ///
44    /// See the note on the root module regarding the trait bounds if they're
45    /// confusing.
46    pub fn new(command: Command) -> calloop::Result<Self> {
47        let (executor, scheduler) = calloop::futures::executor()?;
48        let (receiver, stopper) = Self::schedule_command(scheduler, command)?;
49
50        Ok(Self {
51            executor,
52            outcome: None,
53            receiver: receiver.into(),
54            stopper: Some(stopper),
55            output_finished: false,
56        })
57    }
58
59    pub fn kill(&mut self) {
60        if let Some(stopper) = self.stopper.take() {
61            stopper
62                .send(())
63                .expect("Could not send internal message to stop subprocess");
64        }
65
66        // Otherwise the sender has been dropped, which will happen shortly
67        // before the executor finishes and returns the status.
68    }
69
70    /// Schedules a single command in our async executor, and also sets up a
71    /// channel to send buffered output back. Sending `()` over the `oneshot`
72    /// channel will cause the underlying subprocess to be killed.
73    fn schedule_command(
74        scheduler: calloop::futures::Scheduler<CommandResult>,
75        command: Command,
76    ) -> std::io::Result<(
77        calloop::channel::Channel<ListenEvent>,
78        futures::channel::oneshot::Sender<()>,
79    )> {
80        let command_debug_str_for_here = format!("{:?}", command);
81
82        let (sender, receiver) = calloop::channel::channel();
83        let (stopper, stop_rx) = futures::channel::oneshot::channel();
84
85        let async_exec = subproc_listener(command, sender, stop_rx);
86
87        scheduler.schedule(async_exec).map_err(|_| {
88            std::io::Error::new(
89                std::io::ErrorKind::InvalidData,
90                format!(
91                    "Could not schedule command: {:?}",
92                    command_debug_str_for_here
93                ),
94            )
95        })?;
96
97        Ok((receiver, stopper))
98    }
99}
100
101impl calloop::EventSource for SubprocListen {
102    type Event = ListenEvent;
103
104    type Metadata = ();
105
106    // Callback should return `true` to end the process.
107    type Ret = bool;
108
109    type Error = LaunchError;
110
111    fn process_events<F>(
112        &mut self,
113        readiness: calloop::Readiness,
114        token: calloop::Token,
115        mut callback: F,
116    ) -> Result<calloop::PostAction, Self::Error>
117    where
118        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
119    {
120        // The executor will provide an event when the subprocess is finished,
121        // but we need to be careful! It is possible to see the executor finish
122        // before we actually receive all the output from the command, because
123        // they arrive via two different event sources. We need to make sure
124        // that we get all of the output before finishing.
125
126        self.executor
127            .process_events(readiness, token, |cmd_res, _| {
128                // At this point:
129                // - the scheduler has been dropped (it was dropped after the
130                //   constructor finished)
131                // - the future being run by the executor has finished (by
132                //   definition of execution reaching here)
133                // - the sending end of self.receiver has been dropped (but note
134                //   the channel itself may still have messages that might need
135                //   processing!)
136                // - the subprocess has been killed and will be cleaned up up by
137                //   async_process
138                // - we provide no way to schedule more futures in the executor
139                //
140                // So we set self.outcome and wait for the output message
141                // channel to reach its end.
142
143                // The command has already ended, so we don't care about the
144                // command string except to log it.
145                let CommandResult { command, result } = cmd_res;
146                log::trace!("Subprocess ended: {}", command);
147
148                // Drop the stopper, we can't use it any more.
149                self.stopper.take();
150
151                self.outcome = Some(result);
152            })?;
153
154        // We need to keep track of whether any callback call returns true, not
155        // just the last one.
156        let mut kill = false;
157
158        let channel_post_action = self.receiver.process_events(readiness, token, |msg, _| {
159            match msg {
160                calloop::channel::Event::Msg(event) => {
161                    let this_kill = callback(event, &mut ());
162                    // Avoids problems with a 'true' being followed by a 'false' if
163                    // `process_events()` calls the callback multiple times in one
164                    // go.
165                    kill = kill || this_kill;
166                }
167                calloop::channel::Event::Closed => self.output_finished = true,
168            }
169        })?;
170
171        if kill {
172            self.kill();
173        }
174
175        // Send the final event when both the process is finished and we've
176        // drained all the output.
177
178        let process_finished = self.outcome.is_some();
179
180        let post_action = if process_finished && self.output_finished {
181            let _ = callback(ListenEvent::End(self.outcome.take().unwrap()), &mut ());
182            // No events should be issued after this point.
183            PostAction::Remove
184        } else {
185            channel_post_action
186        };
187
188        Ok(post_action)
189    }
190
191    fn register(
192        &mut self,
193        poll: &mut calloop::Poll,
194        token_factory: &mut calloop::TokenFactory,
195    ) -> calloop::Result<()> {
196        calloop::batch_register!(poll, token_factory, self.executor, self.receiver)
197    }
198
199    fn reregister(
200        &mut self,
201        poll: &mut calloop::Poll,
202        token_factory: &mut calloop::TokenFactory,
203    ) -> calloop::Result<()> {
204        calloop::batch_reregister!(poll, token_factory, self.executor, self.receiver)
205    }
206
207    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
208        calloop::batch_unregister!(poll, self.executor, self.receiver)
209    }
210}
211
212/// The events generated by [`SubprocListen`].
213#[derive(Debug)]
214pub enum ListenEvent {
215    /// The subprocess was started.
216    Start,
217    /// A line was received from the subprocess.
218    Line(String),
219    /// The subprocess has ended.
220    End(Result<(), ErrorEvent>),
221}
222
223/// The async function that actually listens to the subprocess's stdout, line
224/// buffers it, sends it back over the channel, and checks the "kill" signal.
225async fn subproc_listener(
226    command: Command,
227    sender: calloop::channel::Sender<ListenEvent>,
228    stopper: futures::channel::oneshot::Receiver<()>,
229) -> CommandResult {
230    let command_debug_str = format!("{:?}", command);
231
232    let mut async_command: async_process::Command = command.into();
233
234    // If we cannot spawn the subprocess, return an error immediately.
235    // The sender will be dropped upon return.
236    let async_child = async_command.stdout(async_process::Stdio::piped()).spawn();
237
238    let mut async_child = match async_child {
239        Ok(child) => child,
240        Err(error) => {
241            return CommandResult {
242                command: command_debug_str,
243                result: Err(ErrorEvent::IoError(error)),
244            }
245        }
246    };
247
248    // The subprocess has started.
249    sender
250        .send(ListenEvent::Start)
251        .expect("Could not send start message over internal channel");
252
253    // A StreamExt that gives us the line-buffered stdout.
254    let lines = BufReader::new(
255        async_child
256            .stdout
257            .take()
258            .expect("Cannot access subprocess stdout"),
259    )
260    .lines();
261
262    // This will stop the stream prematurely if stopper resolves to a value,
263    // which it will do if the caller sends a value over the one-shot channel.
264    let mut lines_or_stop = lines.take_until(stopper);
265
266    // Continually process stdout lines as they become available.
267    while let Some(line) = lines_or_stop.next().await {
268        match line {
269            // Stream produced another line.
270            Ok(line) => {
271                sender
272                    .send(ListenEvent::Line(line))
273                    .expect("Could not send data over internal channel");
274            }
275            // Stream produced an error.
276            Err(error) => {
277                log::warn!(
278                    "Error in output stream for subprocess: {}",
279                    command_debug_str
280                );
281                log::warn!("Error: {:#?}", error);
282                break;
283            }
284        }
285    }
286
287    // The stream has either ended (EOF by subprocess) or produced an
288    // error. Kill it (possibly again).
289    if let Err(error) = async_child.kill() {
290        log::warn!("Error killing subprocess: {}", command_debug_str);
291        log::warn!("Error: {:#?}", error);
292    } else {
293        log::trace!("Killed subprocess: {}", command_debug_str);
294    }
295
296    // Wait for the subprocess to end and return the result.
297    match async_child.status().await {
298        Ok(status) => {
299            if status.success() {
300                CommandResult {
301                    command: command_debug_str,
302                    result: Ok(()),
303                }
304            } else {
305                CommandResult {
306                    command: command_debug_str,
307                    result: Err(ErrorEvent::SubprocError(status)),
308                }
309            }
310        }
311
312        Err(error) => CommandResult {
313            command: command_debug_str,
314            result: Err(ErrorEvent::IoError(error)),
315        },
316    }
317}