calloop-subproc 1.0.0

Subprocess support for the Calloop event loop
Documentation
use super::{Command, CommandResult, ErrorEvent, LaunchError};
use calloop::{transient::TransientSource, PostAction};
use futures::{io::BufReader, AsyncBufReadExt, StreamExt};

/// An event source that runs a subprocess and generates events for lines of
/// output.
///
/// This event source will run a command as a subprocess, generate
/// [`ListenEvent::Line`] events for every line it prints to stdout, and one
/// final [`ListenEvent::End`] event when it ends.
///
/// Processes that run indefinitely can be killed (using `SIGKILL` ie. 9 on
/// Linux) by returning `true` from the callback provided to `process_events()`
/// or by calling the [`kill()`](Self::kill) method. Note that there may be more
/// `Line` events generated after this, depending on the order in which the kill
/// request and remaining output are processed.
///
/// After the subprocess has been ended and the final `End` event has been
/// delivered, this event source should be removed from the event loop.
pub struct SubprocListen {
    /// The executor (event source) that actually runs the command and generates
    /// events for us.
    executor: calloop::futures::Executor<CommandResult>,

    /// When the executor is finished, we need to stash its result until the
    /// output is finished.
    outcome: Option<Result<(), ErrorEvent>>,

    /// Use a transient source because the sender can be dropped by the async
    /// executor.
    receiver: TransientSource<calloop::channel::Channel<ListenEvent>>,

    /// Used to stop the subprocess before the end of the (potentially infinite)
    /// stream of stdout.
    stopper: Option<futures::channel::oneshot::Sender<()>>,

    /// Set to true when all of the output has been received.
    output_finished: bool,
}

impl SubprocListen {
    /// Takes a command and schedules a subprocess to be run asychronously.
    ///
    /// See the note on the root module regarding the trait bounds if they're
    /// confusing.
    pub fn new(command: Command) -> calloop::Result<Self> {
        let (executor, scheduler) = calloop::futures::executor()?;
        let (receiver, stopper) = Self::schedule_command(scheduler, command)?;

        Ok(Self {
            executor,
            outcome: None,
            receiver: receiver.into(),
            stopper: Some(stopper),
            output_finished: false,
        })
    }

    pub fn kill(&mut self) {
        if let Some(stopper) = self.stopper.take() {
            stopper
                .send(())
                .expect("Could not send internal message to stop subprocess");
        }

        // Otherwise the sender has been dropped, which will happen shortly
        // before the executor finishes and returns the status.
    }

    /// Schedules a single command in our async executor, and also sets up a
    /// channel to send buffered output back. Sending `()` over the `oneshot`
    /// channel will cause the underlying subprocess to be killed.
    fn schedule_command(
        scheduler: calloop::futures::Scheduler<CommandResult>,
        command: Command,
    ) -> std::io::Result<(
        calloop::channel::Channel<ListenEvent>,
        futures::channel::oneshot::Sender<()>,
    )> {
        let command_debug_str_for_here = format!("{:?}", command);

        let (sender, receiver) = calloop::channel::channel();
        let (stopper, stop_rx) = futures::channel::oneshot::channel();

        let async_exec = subproc_listener(command, sender, stop_rx);

        scheduler.schedule(async_exec).map_err(|_| {
            std::io::Error::new(
                std::io::ErrorKind::InvalidData,
                format!(
                    "Could not schedule command: {:?}",
                    command_debug_str_for_here
                ),
            )
        })?;

        Ok((receiver, stopper))
    }
}

impl calloop::EventSource for SubprocListen {
    type Event = ListenEvent;

    type Metadata = ();

    // Callback should return `true` to end the process.
    type Ret = bool;

    type Error = LaunchError;

    fn process_events<F>(
        &mut self,
        readiness: calloop::Readiness,
        token: calloop::Token,
        mut callback: F,
    ) -> Result<calloop::PostAction, Self::Error>
    where
        F: FnMut(Self::Event, &mut Self::Metadata) -> Self::Ret,
    {
        // The executor will provide an event when the subprocess is finished,
        // but we need to be careful! It is possible to see the executor finish
        // before we actually receive all the output from the command, because
        // they arrive via two different event sources. We need to make sure
        // that we get all of the output before finishing.

        self.executor
            .process_events(readiness, token, |cmd_res, _| {
                // At this point:
                // - the scheduler has been dropped (it was dropped after the
                //   constructor finished)
                // - the future being run by the executor has finished (by
                //   definition of execution reaching here)
                // - the sending end of self.receiver has been dropped (but note
                //   the channel itself may still have messages that might need
                //   processing!)
                // - the subprocess has been killed and will be cleaned up up by
                //   async_process
                // - we provide no way to schedule more futures in the executor
                //
                // So we set self.outcome and wait for the output message
                // channel to reach its end.

                // The command has already ended, so we don't care about the
                // command string except to log it.
                let CommandResult { command, result } = cmd_res;
                log::trace!("Subprocess ended: {}", command);

                // Drop the stopper, we can't use it any more.
                self.stopper.take();

                self.outcome = Some(result);
            })?;

        // We need to keep track of whether any callback call returns true, not
        // just the last one.
        let mut kill = false;

        let channel_post_action = self.receiver.process_events(readiness, token, |msg, _| {
            match msg {
                calloop::channel::Event::Msg(event) => {
                    let this_kill = callback(event, &mut ());
                    // Avoids problems with a 'true' being followed by a 'false' if
                    // `process_events()` calls the callback multiple times in one
                    // go.
                    kill = kill || this_kill;
                }
                calloop::channel::Event::Closed => self.output_finished = true,
            }
        })?;

        if kill {
            self.kill();
        }

        // Send the final event when both the process is finished and we've
        // drained all the output.

        let process_finished = self.outcome.is_some();

        let post_action = if process_finished && self.output_finished {
            let _ = callback(ListenEvent::End(self.outcome.take().unwrap()), &mut ());
            // No events should be issued after this point.
            PostAction::Remove
        } else {
            channel_post_action
        };

        Ok(post_action)
    }

    fn register(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        calloop::batch_register!(poll, token_factory, self.executor, self.receiver)
    }

    fn reregister(
        &mut self,
        poll: &mut calloop::Poll,
        token_factory: &mut calloop::TokenFactory,
    ) -> calloop::Result<()> {
        calloop::batch_reregister!(poll, token_factory, self.executor, self.receiver)
    }

    fn unregister(&mut self, poll: &mut calloop::Poll) -> calloop::Result<()> {
        calloop::batch_unregister!(poll, self.executor, self.receiver)
    }
}

/// The events generated by [`SubprocListen`].
#[derive(Debug)]
pub enum ListenEvent {
    /// The subprocess was started.
    Start,
    /// A line was received from the subprocess.
    Line(String),
    /// The subprocess has ended.
    End(Result<(), ErrorEvent>),
}

/// The async function that actually listens to the subprocess's stdout, line
/// buffers it, sends it back over the channel, and checks the "kill" signal.
async fn subproc_listener(
    command: Command,
    sender: calloop::channel::Sender<ListenEvent>,
    stopper: futures::channel::oneshot::Receiver<()>,
) -> CommandResult {
    let command_debug_str = format!("{:?}", command);

    let mut async_command: async_process::Command = command.into();

    // If we cannot spawn the subprocess, return an error immediately.
    // The sender will be dropped upon return.
    let async_child = async_command.stdout(async_process::Stdio::piped()).spawn();

    let mut async_child = match async_child {
        Ok(child) => child,
        Err(error) => {
            return CommandResult {
                command: command_debug_str,
                result: Err(ErrorEvent::IoError(error)),
            }
        }
    };

    // The subprocess has started.
    sender
        .send(ListenEvent::Start)
        .expect("Could not send start message over internal channel");

    // A StreamExt that gives us the line-buffered stdout.
    let lines = BufReader::new(
        async_child
            .stdout
            .take()
            .expect("Cannot access subprocess stdout"),
    )
    .lines();

    // This will stop the stream prematurely if stopper resolves to a value,
    // which it will do if the caller sends a value over the one-shot channel.
    let mut lines_or_stop = lines.take_until(stopper);

    // Continually process stdout lines as they become available.
    while let Some(line) = lines_or_stop.next().await {
        match line {
            // Stream produced another line.
            Ok(line) => {
                sender
                    .send(ListenEvent::Line(line))
                    .expect("Could not send data over internal channel");
            }
            // Stream produced an error.
            Err(error) => {
                log::warn!(
                    "Error in output stream for subprocess: {}",
                    command_debug_str
                );
                log::warn!("Error: {:#?}", error);
                break;
            }
        }
    }

    // The stream has either ended (EOF by subprocess) or produced an
    // error. Kill it (possibly again).
    if let Err(error) = async_child.kill() {
        log::warn!("Error killing subprocess: {}", command_debug_str);
        log::warn!("Error: {:#?}", error);
    } else {
        log::trace!("Killed subprocess: {}", command_debug_str);
    }

    // Wait for the subprocess to end and return the result.
    match async_child.status().await {
        Ok(status) => {
            if status.success() {
                CommandResult {
                    command: command_debug_str,
                    result: Ok(()),
                }
            } else {
                CommandResult {
                    command: command_debug_str,
                    result: Err(ErrorEvent::SubprocError(status)),
                }
            }
        }

        Err(error) => CommandResult {
            command: command_debug_str,
            result: Err(ErrorEvent::IoError(error)),
        },
    }
}