calloop_subproc/listen.rs
1use super::{Command, CommandResult, ErrorEvent, LaunchError};
2use calloop::{transient::TransientSource, PostAction};
3use futures::{io::BufReader, AsyncBufReadExt, StreamExt};
4
5/// An event source that runs a subprocess and generates events for lines of
6/// output.
7///
8/// This event source will run a command as a subprocess, generate
9/// [`ListenEvent::Line`] events for every line it prints to stdout, and one
10/// final [`ListenEvent::End`] event when it ends.
11///
12/// Processes that run indefinitely can be killed (using `SIGKILL` ie. 9 on
13/// Linux) by returning `true` from the callback provided to `process_events()`
14/// or by calling the [`kill()`](Self::kill) method. Note that there may be more
15/// `Line` events generated after this, depending on the order in which the kill
16/// request and remaining output are processed.
17///
18/// After the subprocess has been ended and the final `End` event has been
19/// delivered, this event source should be removed from the event loop.
20pub struct SubprocListen {
21 /// The executor (event source) that actually runs the command and generates
22 /// events for us.
23 executor: calloop::futures::Executor<CommandResult>,
24
25 /// When the executor is finished, we need to stash its result until the
26 /// output is finished.
27 outcome: Option<Result<(), ErrorEvent>>,
28
29 /// Use a transient source because the sender can be dropped by the async
30 /// executor.
31 receiver: TransientSource<calloop::channel::Channel<ListenEvent>>,
32
33 /// Used to stop the subprocess before the end of the (potentially infinite)
34 /// stream of stdout.
35 stopper: Option<futures::channel::oneshot::Sender<()>>,
36
37 /// Set to true when all of the output has been received.
38 output_finished: bool,
39}
40
41impl SubprocListen {
42 /// Takes a command and schedules a subprocess to be run asychronously.
43 ///
44 /// See the note on the root module regarding the trait bounds if they're
45 /// confusing.
46 pub fn new(command: Command) -> calloop::Result<Self> {
47 let (executor, scheduler) = calloop::futures::executor()?;
48 let (receiver, stopper) = Self::schedule_command(scheduler, command)?;
49
50 Ok(Self {
51 executor,
52 outcome: None,
53 receiver: receiver.into(),
54 stopper: Some(stopper),
55 output_finished: false,
56 })
57 }
58
59 pub fn kill(&mut self) {
60 if let Some(stopper) = self.stopper.take() {
61 stopper
62 .send(())
63 .expect("Could not send internal message to stop subprocess");
64 }
65
66 // Otherwise the sender has been dropped, which will happen shortly
67 // before the executor finishes and returns the status.
68 }
69
70 /// Schedules a single command in our async executor, and also sets up a
71 /// channel to send buffered output back. Sending `()` over the `oneshot`
72 /// channel will cause the underlying subprocess to be killed.
73 fn schedule_command(
74 scheduler: calloop::futures::Scheduler<CommandResult>,
75 command: Command,
76 ) -> std::io::Result<(
77 calloop::channel::Channel<ListenEvent>,
78 futures::channel::oneshot::Sender<()>,
79 )> {
80 let command_debug_str_for_here = format!("{:?}", command);
81
82 let (sender, receiver) = calloop::channel::channel();
83 let (stopper, stop_rx) = futures::channel::oneshot::channel();
84
85 let async_exec = subproc_listener(command, sender, stop_rx);
86
87 scheduler.schedule(async_exec).map_err(|_| {
88 std::io::Error::new(
89 std::io::ErrorKind::InvalidData,
90 format!(
91 "Could not schedule command: {:?}",
92 command_debug_str_for_here
93 ),
94 )
95 })?;
96
97 Ok((receiver, stopper))
98 }
99}
100
101impl calloop::EventSource for SubprocListen {
102 type Event = ListenEvent;
103
104 type Metadata = ();
105
106 // Callback should return `true` to end the process.
107 type Ret = bool;
108
109 type Error = LaunchError;
110
111 fn process_events<F>(
112 &mut self,
113 readiness: calloop::Readiness,
114 token: calloop::Token,
115 mut callback: F,
116 ) -> Result<calloop::PostAction, Self::Error>
117 where
118 F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
119 {
120 // The executor will provide an event when the subprocess is finished,
121 // but we need to be careful! It is possible to see the executor finish
122 // before we actually receive all the output from the command, because
123 // they arrive via two different event sources. We need to make sure
124 // that we get all of the output before finishing.
125
126 self.executor
127 .process_events(readiness, token, |cmd_res, _| {
128 // At this point:
129 // - the scheduler has been dropped (it was dropped after the
130 // constructor finished)
131 // - the future being run by the executor has finished (by
132 // definition of execution reaching here)
133 // - the sending end of self.receiver has been dropped (but note
134 // the channel itself may still have messages that might need
135 // processing!)
136 // - the subprocess has been killed and will be cleaned up up by
137 // async_process
138 // - we provide no way to schedule more futures in the executor
139 //
140 // So we set self.outcome and wait for the output message
141 // channel to reach its end.
142
143 // The command has already ended, so we don't care about the
144 // command string except to log it.
145 let CommandResult { command, result } = cmd_res;
146 log::trace!("Subprocess ended: {}", command);
147
148 // Drop the stopper, we can't use it any more.
149 self.stopper.take();
150
151 self.outcome = Some(result);
152 })?;
153
154 // We need to keep track of whether any callback call returns true, not
155 // just the last one.
156 let mut kill = false;
157
158 let channel_post_action = self.receiver.process_events(readiness, token, |msg, _| {
159 match msg {
160 calloop::channel::Event::Msg(event) => {
161 let this_kill = callback(event, &mut ());
162 // Avoids problems with a 'true' being followed by a 'false' if
163 // `process_events()` calls the callback multiple times in one
164 // go.
165 kill = kill || this_kill;
166 }
167 calloop::channel::Event::Closed => self.output_finished = true,
168 }
169 })?;
170
171 if kill {
172 self.kill();
173 }
174
175 // Send the final event when both the process is finished and we've
176 // drained all the output.
177
178 let process_finished = self.outcome.is_some();
179
180 let post_action = if process_finished && self.output_finished {
181 let _ = callback(ListenEvent::End(self.outcome.take().unwrap()), &mut ());
182 // No events should be issued after this point.
183 PostAction::Remove
184 } else {
185 channel_post_action
186 };
187
188 Ok(post_action)
189 }
190
191 fn register(
192 &mut self,
193 poll: &mut calloop::Poll,
194 token_factory: &mut calloop::TokenFactory,
195 ) -> calloop::Result<()> {
196 calloop::batch_register!(poll, token_factory, self.executor, self.receiver)
197 }
198
199 fn reregister(
200 &mut self,
201 poll: &mut calloop::Poll,
202 token_factory: &mut calloop::TokenFactory,
203 ) -> calloop::Result<()> {
204 calloop::batch_reregister!(poll, token_factory, self.executor, self.receiver)
205 }
206
207 fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
208 calloop::batch_unregister!(poll, self.executor, self.receiver)
209 }
210}
211
212/// The events generated by [`SubprocListen`].
213#[derive(Debug)]
214pub enum ListenEvent {
215 /// The subprocess was started.
216 Start,
217 /// A line was received from the subprocess.
218 Line(String),
219 /// The subprocess has ended.
220 End(Result<(), ErrorEvent>),
221}
222
223/// The async function that actually listens to the subprocess's stdout, line
224/// buffers it, sends it back over the channel, and checks the "kill" signal.
225async fn subproc_listener(
226 command: Command,
227 sender: calloop::channel::Sender<ListenEvent>,
228 stopper: futures::channel::oneshot::Receiver<()>,
229) -> CommandResult {
230 let command_debug_str = format!("{:?}", command);
231
232 let mut async_command: async_process::Command = command.into();
233
234 // If we cannot spawn the subprocess, return an error immediately.
235 // The sender will be dropped upon return.
236 let async_child = async_command.stdout(async_process::Stdio::piped()).spawn();
237
238 let mut async_child = match async_child {
239 Ok(child) => child,
240 Err(error) => {
241 return CommandResult {
242 command: command_debug_str,
243 result: Err(ErrorEvent::IoError(error)),
244 }
245 }
246 };
247
248 // The subprocess has started.
249 sender
250 .send(ListenEvent::Start)
251 .expect("Could not send start message over internal channel");
252
253 // A StreamExt that gives us the line-buffered stdout.
254 let lines = BufReader::new(
255 async_child
256 .stdout
257 .take()
258 .expect("Cannot access subprocess stdout"),
259 )
260 .lines();
261
262 // This will stop the stream prematurely if stopper resolves to a value,
263 // which it will do if the caller sends a value over the one-shot channel.
264 let mut lines_or_stop = lines.take_until(stopper);
265
266 // Continually process stdout lines as they become available.
267 while let Some(line) = lines_or_stop.next().await {
268 match line {
269 // Stream produced another line.
270 Ok(line) => {
271 sender
272 .send(ListenEvent::Line(line))
273 .expect("Could not send data over internal channel");
274 }
275 // Stream produced an error.
276 Err(error) => {
277 log::warn!(
278 "Error in output stream for subprocess: {}",
279 command_debug_str
280 );
281 log::warn!("Error: {:#?}", error);
282 break;
283 }
284 }
285 }
286
287 // The stream has either ended (EOF by subprocess) or produced an
288 // error. Kill it (possibly again).
289 if let Err(error) = async_child.kill() {
290 log::warn!("Error killing subprocess: {}", command_debug_str);
291 log::warn!("Error: {:#?}", error);
292 } else {
293 log::trace!("Killed subprocess: {}", command_debug_str);
294 }
295
296 // Wait for the subprocess to end and return the result.
297 match async_child.status().await {
298 Ok(status) => {
299 if status.success() {
300 CommandResult {
301 command: command_debug_str,
302 result: Ok(()),
303 }
304 } else {
305 CommandResult {
306 command: command_debug_str,
307 result: Err(ErrorEvent::SubprocError(status)),
308 }
309 }
310 }
311
312 Err(error) => CommandResult {
313 command: command_debug_str,
314 result: Err(ErrorEvent::IoError(error)),
315 },
316 }
317}